This is an automated email from the ASF dual-hosted git repository.
feiwang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kyuubi.git
The following commit(s) were added to refs/heads/master by this push:
new e769f42398 [KYUUBI #6884] [FEATURE] Support to reassign the batches to
alternative kyuubi instance in case kyuubi instance lost
e769f42398 is described below
commit e769f4239891536e9850c11b9661ea65d82a01f9
Author: Wang, Fei <[email protected]>
AuthorDate: Sun Jun 22 22:36:51 2025 -0700
[KYUUBI #6884] [FEATURE] Support to reassign the batches to alternative
kyuubi instance in case kyuubi instance lost
### Why are the changes needed?
Support to reassign the batches to alternative kyuubi instance in case
kyuubi instance lost.
https://github.com/apache/kyuubi/issues/6884
### How was this patch tested?
Unit Test
### Was this patch authored or co-authored using generative AI tooling?
No
Closes #7037 from George314159/6884.
Closes #6884
8565d4aaa [Wang, Fei] KYUUBI_SESSION_CONNECTION_URL_KEY
22d4539e2 [Wang, Fei] admin
075654cb3 [Wang, Fei] check admin
5654a99f4 [Wang, Fei] log and lock
a19e2edf5 [Wang, Fei] minor comments
a60f23ba3 [George314159] refine
760e10f89 [George314159] Update Based On Comments
75f1ee2a9 [Fei Wang] ping (#1)
f42bcaf9a [George314159] Update Based on Comments
1bea70ed6 [George314159] [KYUUBI-6884] Support to reassign the batches to
alternative kyuubi instance in case kyuubi instance lost
Lead-authored-by: Wang, Fei <[email protected]>
Co-authored-by: George314159 <[email protected]>
Co-authored-by: Fei Wang <[email protected]>
Signed-off-by: Wang, Fei <[email protected]>
---
.../java/org/apache/kyuubi/client/BaseRestApi.java | 45 ++++++++++
.../client/api/v1/dto/ReassignBatchRequest.java | 58 +++++++++++++
.../client/api/v1/dto/ReassignBatchResponse.java | 81 ++++++++++++++++++
.../kyuubi/server/KyuubiRestFrontendService.scala | 60 ++++++++++++-
.../kyuubi/server/api/v1/BatchesResource.scala | 36 ++++++++
.../kyuubi/server/api/v1/InternalRestClient.scala | 6 +-
.../apache/kyuubi/session/KyuubiBatchSession.scala | 4 +-
.../kyuubi/session/KyuubiSessionManager.scala | 64 +++++++++++---
.../server/api/v1/BatchesResourceSuite.scala | 99 ++++++++++++++++++++++
.../server/rest/client/BaseRestApiSuite.scala | 37 ++++++++
10 files changed, 475 insertions(+), 15 deletions(-)
diff --git
a/kyuubi-rest-client/src/main/java/org/apache/kyuubi/client/BaseRestApi.java
b/kyuubi-rest-client/src/main/java/org/apache/kyuubi/client/BaseRestApi.java
new file mode 100644
index 0000000000..06792a741f
--- /dev/null
+++ b/kyuubi-rest-client/src/main/java/org/apache/kyuubi/client/BaseRestApi.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kyuubi.client;
+
+import org.apache.kyuubi.client.api.v1.dto.VersionInfo;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class BaseRestApi {
+ private static final Logger LOG = LoggerFactory.getLogger(BaseRestApi.class);
+ private KyuubiRestClient client;
+
+ private BaseRestApi() {}
+
+ public BaseRestApi(KyuubiRestClient client) {
+ this.client = client;
+ }
+
+ public String ping() {
+ return this.getClient().get("ping", null, client.getAuthHeader());
+ }
+
+ public VersionInfo getVersionInfo() {
+ return this.getClient().get("version", null, VersionInfo.class,
client.getAuthHeader());
+ }
+
+ private IRestClient getClient() {
+ return this.client.getHttpClient();
+ }
+}
diff --git
a/kyuubi-rest-client/src/main/java/org/apache/kyuubi/client/api/v1/dto/ReassignBatchRequest.java
b/kyuubi-rest-client/src/main/java/org/apache/kyuubi/client/api/v1/dto/ReassignBatchRequest.java
new file mode 100644
index 0000000000..3daa21083a
--- /dev/null
+++
b/kyuubi-rest-client/src/main/java/org/apache/kyuubi/client/api/v1/dto/ReassignBatchRequest.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kyuubi.client.api.v1.dto;
+
+import java.util.Objects;
+import org.apache.commons.lang3.builder.ReflectionToStringBuilder;
+import org.apache.commons.lang3.builder.ToStringStyle;
+
+public class ReassignBatchRequest {
+ private String kyuubiInstance;
+
+ public ReassignBatchRequest() {}
+
+ public ReassignBatchRequest(String kyuubiInstance) {
+ this.kyuubiInstance = kyuubiInstance;
+ }
+
+ public String getKyuubiInstance() {
+ return kyuubiInstance;
+ }
+
+ public void setKyuubiInstance(String kyuubiInstance) {
+ this.kyuubiInstance = kyuubiInstance;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) return true;
+ if (o == null || getClass() != o.getClass()) return false;
+ ReassignBatchRequest that = (ReassignBatchRequest) o;
+ return Objects.equals(getKyuubiInstance(), that.getKyuubiInstance());
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(getKyuubiInstance());
+ }
+
+ @Override
+ public String toString() {
+ return ReflectionToStringBuilder.toString(this, ToStringStyle.JSON_STYLE);
+ }
+}
diff --git
a/kyuubi-rest-client/src/main/java/org/apache/kyuubi/client/api/v1/dto/ReassignBatchResponse.java
b/kyuubi-rest-client/src/main/java/org/apache/kyuubi/client/api/v1/dto/ReassignBatchResponse.java
new file mode 100644
index 0000000000..15c766b5bb
--- /dev/null
+++
b/kyuubi-rest-client/src/main/java/org/apache/kyuubi/client/api/v1/dto/ReassignBatchResponse.java
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kyuubi.client.api.v1.dto;
+
+import java.util.*;
+import org.apache.commons.lang3.builder.ReflectionToStringBuilder;
+import org.apache.commons.lang3.builder.ToStringStyle;
+
+public class ReassignBatchResponse {
+ private List<String> batchIds = new ArrayList<>(0);
+ private String originalKyuubiInstance = null;
+ private String newKyuubiInstance = null;
+
+ public ReassignBatchResponse() {}
+
+ public ReassignBatchResponse(
+ List<String> batchIds, String originalKyuubiInstance, String
newKyuubiInstance) {
+ this.originalKyuubiInstance = originalKyuubiInstance;
+ this.newKyuubiInstance = newKyuubiInstance;
+ this.batchIds = batchIds;
+ }
+
+ public List<String> getBatchIds() {
+ return batchIds;
+ }
+
+ public void setBatchIds(List<String> batchIds) {
+ this.batchIds = batchIds;
+ }
+
+ public String getOriginalKyuubiInstance() {
+ return originalKyuubiInstance;
+ }
+
+ public void setOriginalKyuubiInstance(String originalKyuubiInstance) {
+ this.originalKyuubiInstance = originalKyuubiInstance;
+ }
+
+ public String getNewKyuubiInstance() {
+ return newKyuubiInstance;
+ }
+
+ public void setNewKyuubiInstance(String newKyuubiInstance) {
+ this.newKyuubiInstance = newKyuubiInstance;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) return true;
+ if (o == null || getClass() != o.getClass()) return false;
+ ReassignBatchResponse that = (ReassignBatchResponse) o;
+ return Objects.equals(getBatchIds(), that.getBatchIds())
+ && Objects.equals(getOriginalKyuubiInstance(),
that.getOriginalKyuubiInstance())
+ && Objects.equals(getNewKyuubiInstance(), that.getNewKyuubiInstance());
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(batchIds, originalKyuubiInstance, newKyuubiInstance);
+ }
+
+ @Override
+ public String toString() {
+ return ReflectionToStringBuilder.toString(this, ToStringStyle.JSON_STYLE);
+ }
+}
diff --git
a/kyuubi-server/src/main/scala/org/apache/kyuubi/server/KyuubiRestFrontendService.scala
b/kyuubi-server/src/main/scala/org/apache/kyuubi/server/KyuubiRestFrontendService.scala
index 7fa111c7ff..787ac0b047 100644
---
a/kyuubi-server/src/main/scala/org/apache/kyuubi/server/KyuubiRestFrontendService.scala
+++
b/kyuubi-server/src/main/scala/org/apache/kyuubi/server/KyuubiRestFrontendService.scala
@@ -20,6 +20,7 @@ package org.apache.kyuubi.server
import java.util.EnumSet
import java.util.concurrent.{Future, TimeUnit}
import java.util.concurrent.atomic.{AtomicBoolean, AtomicInteger}
+import java.util.concurrent.locks.ReentrantLock
import javax.servlet.DispatcherType
import javax.ws.rs.WebApplicationException
import javax.ws.rs.core.Response.Status
@@ -167,8 +168,18 @@ class KyuubiRestFrontendService(override val serverable:
Serverable)
TimeUnit.MILLISECONDS)
}
+ private val batchRecoveryLock: ReentrantLock = new ReentrantLock()
+ private def withBatchRecoveryLockRequired[T](block: => T): T = {
+ batchRecoveryLock.lock()
+ try {
+ block
+ } finally {
+ batchRecoveryLock.unlock()
+ }
+ }
+
@VisibleForTesting
- private[kyuubi] def recoverBatchSessions(): Unit = {
+ private[kyuubi] def recoverBatchSessions(): Unit =
withBatchRecoveryLockRequired {
val recoveryNumThreads = conf.get(METADATA_RECOVERY_THREADS)
val batchRecoveryExecutor =
ThreadUtils.newDaemonFixedThreadPool(recoveryNumThreads,
"batch-recovery-executor")
@@ -206,6 +217,53 @@ class KyuubiRestFrontendService(override val serverable:
Serverable)
}
}
+ private[kyuubi] def recoverBatchSessionsFromReassign(batchIds: Seq[String]):
Seq[String] =
+ withBatchRecoveryLockRequired {
+ val recoveryNumThreads = conf.get(METADATA_RECOVERY_THREADS)
+ val batchRecoveryExecutor =
+ ThreadUtils.newDaemonFixedThreadPool(recoveryNumThreads,
"batch-reassign-recovery-executor")
+ try {
+ val batchSessionsToRecover =
+ sessionManager.getSpecificBatchSessionsToRecover(batchIds,
connectionUrl)
+ val pendingRecoveryTasksCount = new AtomicInteger(0)
+ val tasks = batchSessionsToRecover.flatMap { batchSession =>
+ val batchId = batchSession.batchJobSubmissionOp.batchId
+ try {
+ val task: Future[Unit] = batchRecoveryExecutor.submit(() =>
+ Utils.tryLogNonFatalError {
+ info(s"Recovering batch[$batchId] from reassign")
+ sessionManager.openBatchSession(batchSession)
+ })
+ Some(task -> batchId)
+ } catch {
+ case e: Throwable =>
+ error(s"Error while submitting batch[$batchId] for recovery", e)
+ None
+ }
+ }
+
+ pendingRecoveryTasksCount.addAndGet(tasks.size)
+
+ val finishedBatchIds: Seq[String] = tasks.flatMap { case (task,
batchId) =>
+ try {
+ task.get()
+ val pendingTasks = pendingRecoveryTasksCount.decrementAndGet()
+ info(s"Batch[$batchId] recovery task terminated, current pending
tasks $pendingTasks")
+ Some(batchId)
+ } catch {
+ case e: Throwable =>
+ error(s"Error while recovering batch[$batchId]", e)
+ val pendingTasks = pendingRecoveryTasksCount.decrementAndGet()
+ info(s"Batch[$batchId] recovery task terminated, current pending
tasks $pendingTasks")
+ None
+ }
+ }
+ finishedBatchIds
+ } finally {
+ ThreadUtils.shutdown(batchRecoveryExecutor)
+ }
+ }
+
private def getBatchPendingMaxElapse(): Long = {
val batchPendingElapseTimes = sessionManager.allSessions().map {
case session: KyuubiBatchSession =>
session.batchJobSubmissionOp.getPendingElapsedTime
diff --git
a/kyuubi-server/src/main/scala/org/apache/kyuubi/server/api/v1/BatchesResource.scala
b/kyuubi-server/src/main/scala/org/apache/kyuubi/server/api/v1/BatchesResource.scala
index 4632527559..150b530dca 100644
---
a/kyuubi-server/src/main/scala/org/apache/kyuubi/server/api/v1/BatchesResource.scala
+++
b/kyuubi-server/src/main/scala/org/apache/kyuubi/server/api/v1/BatchesResource.scala
@@ -575,6 +575,42 @@ private[v1] class BatchesResource extends
ApiRequestContext with Logging {
}
}
+ @ApiResponse(
+ responseCode = "200",
+ content = Array(new Content(
+ mediaType = MediaType.APPLICATION_JSON,
+ schema = new Schema(implementation = classOf[ReassignBatchResponse]))),
+ description =
+ "Reassign batch sessions on an unreachable kyuubi instance to the
current kyuubi instance")
+ @POST
+ @Path("/reassign")
+ @Consumes(Array(MediaType.APPLICATION_JSON))
+ def reassignBatchSessions(request: ReassignBatchRequest):
ReassignBatchResponse = {
+ val userName = fe.getSessionUser(Map.empty[String, String])
+ val ipAddress = fe.getIpAddress
+ val kyuubiInstance = request.getKyuubiInstance
+ val newKyuubiInstance = fe.connectionUrl
+ info(s"Received reassign $kyuubiInstance batch sessions request from
$userName/$ipAddress")
+ if (!fe.isAdministrator(userName)) {
+ throw new ForbiddenException(s"$userName is not allowed to reassign the
batches")
+ }
+ if (kyuubiInstance == newKyuubiInstance) {
+ throw new IllegalStateException(s"KyuubiInstance is alive:
$kyuubiInstance")
+ }
+ val internalRestClient = getInternalRestClient(kyuubiInstance)
+ if (!Utils.isTesting && internalRestClient.pingAble()) {
+ throw new IllegalStateException(s"KyuubiInstance is alive:
$kyuubiInstance")
+ }
+ try {
+ val batchIds = sessionManager.reassignBatchSessions(kyuubiInstance,
newKyuubiInstance)
+ val recoveredBatchIds = fe.recoverBatchSessionsFromReassign(batchIds)
+ new ReassignBatchResponse(recoveredBatchIds.asJava, kyuubiInstance,
newKyuubiInstance)
+ } catch {
+ case e: Exception =>
+ throw new Exception(s"Error reassign batches from $kyuubiInstance to
$newKyuubiInstance", e)
+ }
+ }
+
private def handleUploadingFiles(
batchId: String,
request: BatchRequest,
diff --git
a/kyuubi-server/src/main/scala/org/apache/kyuubi/server/api/v1/InternalRestClient.scala
b/kyuubi-server/src/main/scala/org/apache/kyuubi/server/api/v1/InternalRestClient.scala
index e6f97efb5e..36dd633161 100644
---
a/kyuubi-server/src/main/scala/org/apache/kyuubi/server/api/v1/InternalRestClient.scala
+++
b/kyuubi-server/src/main/scala/org/apache/kyuubi/server/api/v1/InternalRestClient.scala
@@ -20,8 +20,9 @@ package org.apache.kyuubi.server.api.v1
import java.util.Base64
import scala.collection.JavaConverters._
+import scala.util.Try
-import org.apache.kyuubi.client.{BatchRestApi, KyuubiRestClient}
+import org.apache.kyuubi.client.{BaseRestApi, BatchRestApi, KyuubiRestClient}
import org.apache.kyuubi.client.api.v1.dto.{Batch, CloseBatchResponse,
OperationLog}
import org.apache.kyuubi.client.auth.AuthHeaderGenerator
import org.apache.kyuubi.server.http.authentication.AuthSchemes
@@ -53,6 +54,9 @@ class InternalRestClient(
}
private val internalBatchRestApi = new BatchRestApi(initKyuubiRestClient())
+ private val internalBaseRestApi = new BaseRestApi(initKyuubiRestClient())
+
+ def pingAble(): Boolean = Try(internalBaseRestApi.ping()).isSuccess
def getBatch(user: String, clientIp: String, batchId: String): Batch = {
withAuthUser(user) {
diff --git
a/kyuubi-server/src/main/scala/org/apache/kyuubi/session/KyuubiBatchSession.scala
b/kyuubi-server/src/main/scala/org/apache/kyuubi/session/KyuubiBatchSession.scala
index ce8e4d0013..3fe518e0c1 100644
---
a/kyuubi-server/src/main/scala/org/apache/kyuubi/session/KyuubiBatchSession.scala
+++
b/kyuubi-server/src/main/scala/org/apache/kyuubi/session/KyuubiBatchSession.scala
@@ -23,7 +23,7 @@ import scala.collection.JavaConverters._
import org.apache.kyuubi.client.util.BatchUtils._
import org.apache.kyuubi.config.{KyuubiConf, KyuubiReservedKeys}
-import org.apache.kyuubi.config.KyuubiReservedKeys.KYUUBI_BATCH_PRIORITY
+import org.apache.kyuubi.config.KyuubiReservedKeys.{KYUUBI_BATCH_PRIORITY,
KYUUBI_SESSION_CONNECTION_URL_KEY}
import org.apache.kyuubi.engine.KyuubiApplicationManager
import org.apache.kyuubi.engine.spark.SparkProcessBuilder
import org.apache.kyuubi.events.{EventBus, KyuubiSessionEvent}
@@ -54,6 +54,8 @@ class KyuubiBatchSession(
conf,
sessionManager) {
override val sessionType: SessionType = SessionType.BATCH
+ override val connectionUrl: String =
+
metadata.map(_.kyuubiInstance).getOrElse(conf.getOrElse(KYUUBI_SESSION_CONNECTION_URL_KEY,
""))
override val handle: SessionHandle = {
val batchId =
metadata.map(_.identifier).getOrElse(conf(KYUUBI_BATCH_ID_KEY))
diff --git
a/kyuubi-server/src/main/scala/org/apache/kyuubi/session/KyuubiSessionManager.scala
b/kyuubi-server/src/main/scala/org/apache/kyuubi/session/KyuubiSessionManager.scala
index 9423521f6b..344da0e71e 100644
---
a/kyuubi-server/src/main/scala/org/apache/kyuubi/session/KyuubiSessionManager.scala
+++
b/kyuubi-server/src/main/scala/org/apache/kyuubi/session/KyuubiSessionManager.scala
@@ -320,18 +320,58 @@ class KyuubiSessionManager private (name: String) extends
SessionManager(name) {
kyuubiInstance,
0,
Int.MaxValue).map { metadata =>
- createBatchSession(
- metadata.username,
- "anonymous",
- metadata.ipAddress,
- metadata.requestConf,
- metadata.engineType,
- Option(metadata.requestName),
- metadata.resource,
- metadata.className,
- metadata.requestArgs,
- Some(metadata),
- fromRecovery = true)
+ createBatchSessionFromRecovery(metadata)
+ }).getOrElse(Seq.empty)
+ }
+ }
+
+ def getSpecificBatchSessionsToRecover(
+ batchIds: Seq[String],
+ kyuubiInstance: String): Seq[KyuubiBatchSession] = {
+ val batchStatesToRecovery = Set(OperationState.PENDING,
OperationState.RUNNING)
+ batchIds.flatMap { batchId =>
+ getBatchSession(SessionHandle.fromUUID(batchId)) match {
+ case Some(_) =>
+ warn(s"Batch session $batchId is already active, skipping recovery.")
+ None
+ case None =>
+ getBatchMetadata(batchId)
+ .filter(m =>
+ m.kyuubiInstance == kyuubiInstance &&
batchStatesToRecovery.contains(m.opState))
+ .flatMap { metadata =>
Some(createBatchSessionFromRecovery(metadata)) }
+ }
+ }
+ }
+
+ private def createBatchSessionFromRecovery(metadata: Metadata):
KyuubiBatchSession = {
+ createBatchSession(
+ metadata.username,
+ "anonymous",
+ metadata.ipAddress,
+ metadata.requestConf,
+ metadata.engineType,
+ Option(metadata.requestName),
+ metadata.resource,
+ metadata.className,
+ metadata.requestArgs,
+ Some(metadata),
+ fromRecovery = true)
+ }
+
+ def reassignBatchSessions(
+ kyuubiInstance: String,
+ newKyuubiInstance: String): Seq[String] = {
+ Seq(OperationState.PENDING, OperationState.RUNNING).flatMap {
stateToRecover =>
+ metadataManager.map(_.getBatchesRecoveryMetadata(
+ stateToRecover.toString,
+ kyuubiInstance,
+ 0,
+ Int.MaxValue).map { metadata =>
+ updateMetadata(Metadata(
+ identifier = metadata.identifier,
+ kyuubiInstance = newKyuubiInstance))
+ info(s"Reassign batch ${metadata.identifier} from $kyuubiInstance to
$newKyuubiInstance")
+ metadata.identifier
}).getOrElse(Seq.empty)
}
}
diff --git
a/kyuubi-server/src/test/scala/org/apache/kyuubi/server/api/v1/BatchesResourceSuite.scala
b/kyuubi-server/src/test/scala/org/apache/kyuubi/server/api/v1/BatchesResourceSuite.scala
index 4d32de7026..f836f6f157 100644
---
a/kyuubi-server/src/test/scala/org/apache/kyuubi/server/api/v1/BatchesResourceSuite.scala
+++
b/kyuubi-server/src/test/scala/org/apache/kyuubi/server/api/v1/BatchesResourceSuite.scala
@@ -630,6 +630,105 @@ abstract class BatchesResourceSuiteBase extends
KyuubiFunSuite
Int.MaxValue).size === 2)
}
+ test("reassign batch") {
+ val sessionManager =
fe.be.sessionManager.asInstanceOf[KyuubiSessionManager]
+ val kyuubiInstance = fe.connectionUrl
+
+ assert(sessionManager.getActiveUserSessionCount === 0)
+ val batchId1 = UUID.randomUUID().toString
+ val batchId2 = UUID.randomUUID().toString
+
+ val batchMetadata = Metadata(
+ identifier = batchId1,
+ sessionType = SessionType.BATCH,
+ realUser = "kyuubi",
+ username = "kyuubi",
+ ipAddress = "localhost",
+ kyuubiInstance = "other_kyuubi_instance:10099",
+ state = OperationState.PENDING.toString,
+ resource = sparkBatchTestResource.get,
+ className = sparkBatchTestMainClass,
+ requestName = "PENDING_RECOVERY",
+ requestConf = Map("spark.master" -> "local"),
+ requestArgs = Seq.empty,
+ createTime = System.currentTimeMillis(),
+ engineType = "SPARK")
+
+ val batchMetadata2 = batchMetadata.copy(
+ identifier = batchId2,
+ requestName = "RUNNING_RECOVERY")
+ sessionManager.insertMetadata(batchMetadata)
+ sessionManager.insertMetadata(batchMetadata2)
+
+
assert(sessionManager.getBatchFromMetadataStore(batchId1).map(_.getState).contains("PENDING"))
+
assert(sessionManager.getBatchFromMetadataStore(batchId2).map(_.getState).contains("PENDING"))
+
+ val sparkBatchProcessBuilder = new SparkBatchProcessBuilder(
+ "kyuubi",
+ conf,
+ batchId2,
+ "RUNNING_RECOVERY",
+ sparkBatchTestResource,
+ sparkBatchTestMainClass,
+ batchMetadata2.requestConf,
+ batchMetadata2.requestArgs,
+ None)
+ sparkBatchProcessBuilder.start
+
+ var applicationStatus: Option[ApplicationInfo] = None
+ eventually(timeout(5.seconds)) {
+ applicationStatus =
+
sessionManager.applicationManager.getApplicationInfo(ApplicationManagerInfo(None),
batchId2)
+ assert(applicationStatus.isDefined)
+ }
+
+ val metadataToUpdate = Metadata(
+ identifier = batchId2,
+ state = OperationState.RUNNING.toString,
+ engineId = applicationStatus.get.id,
+ engineName = applicationStatus.get.name,
+ engineUrl = applicationStatus.get.url.orNull,
+ engineState = applicationStatus.get.state.toString,
+ engineError = applicationStatus.get.error)
+ sessionManager.updateMetadata(metadataToUpdate)
+
+ val requestObj = new ReassignBatchRequest("other_kyuubi_instance:10099")
+ val response = webTarget.path("api/v1/batches/reassign")
+ .request(MediaType.APPLICATION_JSON_TYPE)
+ .header(AUTHORIZATION_HEADER,
basicAuthorizationHeader(Utils.currentUser))
+ .post(Entity.entity(requestObj, MediaType.APPLICATION_JSON_TYPE))
+ assert(response.getStatus === 200)
+ val batch = response.readEntity(classOf[ReassignBatchResponse])
+ assert(batch.getBatchIds.size() === 2)
+ assert(batch.getBatchIds.contains(batchId1))
+ assert(batch.getBatchIds.contains(batchId2))
+
assert(sessionManager.getBatchMetadata(batchId1).map(_.kyuubiInstance).contains(kyuubiInstance))
+
assert(sessionManager.getBatchMetadata(batchId2).map(_.kyuubiInstance).contains(kyuubiInstance))
+ assert(sessionManager.getActiveUserSessionCount === 2)
+
+ val sessionHandle1 = SessionHandle.fromUUID(batchId1)
+ val sessionHandle2 = SessionHandle.fromUUID(batchId2)
+ val session1 =
sessionManager.getSession(sessionHandle1).asInstanceOf[KyuubiBatchSession]
+ val session2 =
sessionManager.getSession(sessionHandle2).asInstanceOf[KyuubiBatchSession]
+ assert(session1.createTime === batchMetadata.createTime)
+ assert(session2.createTime === batchMetadata2.createTime)
+
+ eventually(timeout(10.seconds)) {
+ val batch1State = session1.batchJobSubmissionOp.getStatus.state
+ assert(batch1State === OperationState.RUNNING ||
OperationState.isTerminal(batch1State))
+ assert(session1.batchJobSubmissionOp.builder.processLaunched)
+
+ val batch2State = session2.batchJobSubmissionOp.getStatus.state
+ assert(batch2State === OperationState.RUNNING ||
OperationState.isTerminal(batch2State))
+ assert(!session2.batchJobSubmissionOp.builder.processLaunched)
+ }
+
+ assert(sessionManager.getBatchesFromMetadataStore(
+ MetadataFilter(engineType = "SPARK"),
+ 0,
+ Int.MaxValue).size === 2)
+ }
+
test("get local log internal redirection") {
val sessionManager =
fe.be.sessionManager.asInstanceOf[KyuubiSessionManager]
val metadata = Metadata(
diff --git
a/kyuubi-server/src/test/scala/org/apache/kyuubi/server/rest/client/BaseRestApiSuite.scala
b/kyuubi-server/src/test/scala/org/apache/kyuubi/server/rest/client/BaseRestApiSuite.scala
new file mode 100644
index 0000000000..15381f7c30
--- /dev/null
+++
b/kyuubi-server/src/test/scala/org/apache/kyuubi/server/rest/client/BaseRestApiSuite.scala
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kyuubi.server.rest.client
+
+import org.apache.kyuubi.{KYUUBI_VERSION, RestClientTestHelper}
+import org.apache.kyuubi.client.{BaseRestApi, KyuubiRestClient}
+
+class BaseRestApiSuite extends RestClientTestHelper {
+ test("base rest apis") {
+ val basicKyuubiRestClient: KyuubiRestClient =
+ KyuubiRestClient.builder(baseUri.toString)
+ .authHeaderMethod(KyuubiRestClient.AuthHeaderMethod.BASIC)
+ .username(ldapUser)
+ .password(ldapUserPasswd)
+ .socketTimeout(30000)
+ .build()
+ val baseRestApi: BaseRestApi = new BaseRestApi(basicKyuubiRestClient)
+
+ assert(baseRestApi.ping() == "pong")
+ assert(baseRestApi.getVersionInfo.getVersion == KYUUBI_VERSION)
+ }
+}