This is an automated email from the ASF dual-hosted git repository.

feiwang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kyuubi.git


The following commit(s) were added to refs/heads/master by this push:
     new e769f42398 [KYUUBI #6884] [FEATURE] Support to reassign the batches to 
alternative kyuubi instance in case kyuubi instance lost
e769f42398 is described below

commit e769f4239891536e9850c11b9661ea65d82a01f9
Author: Wang, Fei <[email protected]>
AuthorDate: Sun Jun 22 22:36:51 2025 -0700

    [KYUUBI #6884] [FEATURE] Support to reassign the batches to alternative 
kyuubi instance in case kyuubi instance lost
    
    ### Why are the changes needed?
    
    Support to reassign the batches to alternative kyuubi instance in case 
kyuubi instance lost.
    https://github.com/apache/kyuubi/issues/6884
    
    ### How was this patch tested?
    
    Unit Test
    
    ### Was this patch authored or co-authored using generative AI tooling?
    
    No
    
    Closes #7037 from George314159/6884.
    
    Closes #6884
    
    8565d4aaa [Wang, Fei] KYUUBI_SESSION_CONNECTION_URL_KEY
    22d4539e2 [Wang, Fei] admin
    075654cb3 [Wang, Fei] check admin
    5654a99f4 [Wang, Fei] log and lock
    a19e2edf5 [Wang, Fei] minor comments
    a60f23ba3 [George314159] refine
    760e10f89 [George314159] Update Based On Comments
    75f1ee2a9 [Fei Wang] ping (#1)
    f42bcaf9a [George314159] Update Based on Comments
    1bea70ed6 [George314159] [KYUUBI-6884] Support to reassign the batches to 
alternative kyuubi instance in case kyuubi instance lost
    
    Lead-authored-by: Wang, Fei <[email protected]>
    Co-authored-by: George314159 <[email protected]>
    Co-authored-by: Fei Wang <[email protected]>
    Signed-off-by: Wang, Fei <[email protected]>
---
 .../java/org/apache/kyuubi/client/BaseRestApi.java | 45 ++++++++++
 .../client/api/v1/dto/ReassignBatchRequest.java    | 58 +++++++++++++
 .../client/api/v1/dto/ReassignBatchResponse.java   | 81 ++++++++++++++++++
 .../kyuubi/server/KyuubiRestFrontendService.scala  | 60 ++++++++++++-
 .../kyuubi/server/api/v1/BatchesResource.scala     | 36 ++++++++
 .../kyuubi/server/api/v1/InternalRestClient.scala  |  6 +-
 .../apache/kyuubi/session/KyuubiBatchSession.scala |  4 +-
 .../kyuubi/session/KyuubiSessionManager.scala      | 64 +++++++++++---
 .../server/api/v1/BatchesResourceSuite.scala       | 99 ++++++++++++++++++++++
 .../server/rest/client/BaseRestApiSuite.scala      | 37 ++++++++
 10 files changed, 475 insertions(+), 15 deletions(-)

diff --git 
a/kyuubi-rest-client/src/main/java/org/apache/kyuubi/client/BaseRestApi.java 
b/kyuubi-rest-client/src/main/java/org/apache/kyuubi/client/BaseRestApi.java
new file mode 100644
index 0000000000..06792a741f
--- /dev/null
+++ b/kyuubi-rest-client/src/main/java/org/apache/kyuubi/client/BaseRestApi.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kyuubi.client;
+
+import org.apache.kyuubi.client.api.v1.dto.VersionInfo;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class BaseRestApi {
+  private static final Logger LOG = LoggerFactory.getLogger(BaseRestApi.class);
+  private KyuubiRestClient client;
+
+  private BaseRestApi() {}
+
+  public BaseRestApi(KyuubiRestClient client) {
+    this.client = client;
+  }
+
+  public String ping() {
+    return this.getClient().get("ping", null, client.getAuthHeader());
+  }
+
+  public VersionInfo getVersionInfo() {
+    return this.getClient().get("version", null, VersionInfo.class, 
client.getAuthHeader());
+  }
+
+  private IRestClient getClient() {
+    return this.client.getHttpClient();
+  }
+}
diff --git 
a/kyuubi-rest-client/src/main/java/org/apache/kyuubi/client/api/v1/dto/ReassignBatchRequest.java
 
b/kyuubi-rest-client/src/main/java/org/apache/kyuubi/client/api/v1/dto/ReassignBatchRequest.java
new file mode 100644
index 0000000000..3daa21083a
--- /dev/null
+++ 
b/kyuubi-rest-client/src/main/java/org/apache/kyuubi/client/api/v1/dto/ReassignBatchRequest.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kyuubi.client.api.v1.dto;
+
+import java.util.Objects;
+import org.apache.commons.lang3.builder.ReflectionToStringBuilder;
+import org.apache.commons.lang3.builder.ToStringStyle;
+
+public class ReassignBatchRequest {
+  private String kyuubiInstance;
+
+  public ReassignBatchRequest() {}
+
+  public ReassignBatchRequest(String kyuubiInstance) {
+    this.kyuubiInstance = kyuubiInstance;
+  }
+
+  public String getKyuubiInstance() {
+    return kyuubiInstance;
+  }
+
+  public void setKyuubiInstance(String kyuubiInstance) {
+    this.kyuubiInstance = kyuubiInstance;
+  }
+
+  @Override
+  public boolean equals(Object o) {
+    if (this == o) return true;
+    if (o == null || getClass() != o.getClass()) return false;
+    ReassignBatchRequest that = (ReassignBatchRequest) o;
+    return Objects.equals(getKyuubiInstance(), that.getKyuubiInstance());
+  }
+
+  @Override
+  public int hashCode() {
+    return Objects.hash(getKyuubiInstance());
+  }
+
+  @Override
+  public String toString() {
+    return ReflectionToStringBuilder.toString(this, ToStringStyle.JSON_STYLE);
+  }
+}
diff --git 
a/kyuubi-rest-client/src/main/java/org/apache/kyuubi/client/api/v1/dto/ReassignBatchResponse.java
 
b/kyuubi-rest-client/src/main/java/org/apache/kyuubi/client/api/v1/dto/ReassignBatchResponse.java
new file mode 100644
index 0000000000..15c766b5bb
--- /dev/null
+++ 
b/kyuubi-rest-client/src/main/java/org/apache/kyuubi/client/api/v1/dto/ReassignBatchResponse.java
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kyuubi.client.api.v1.dto;
+
+import java.util.*;
+import org.apache.commons.lang3.builder.ReflectionToStringBuilder;
+import org.apache.commons.lang3.builder.ToStringStyle;
+
+public class ReassignBatchResponse {
+  private List<String> batchIds = new ArrayList<>(0);
+  private String originalKyuubiInstance = null;
+  private String newKyuubiInstance = null;
+
+  public ReassignBatchResponse() {}
+
+  public ReassignBatchResponse(
+      List<String> batchIds, String originalKyuubiInstance, String 
newKyuubiInstance) {
+    this.originalKyuubiInstance = originalKyuubiInstance;
+    this.newKyuubiInstance = newKyuubiInstance;
+    this.batchIds = batchIds;
+  }
+
+  public List<String> getBatchIds() {
+    return batchIds;
+  }
+
+  public void setBatchIds(List<String> batchIds) {
+    this.batchIds = batchIds;
+  }
+
+  public String getOriginalKyuubiInstance() {
+    return originalKyuubiInstance;
+  }
+
+  public void setOriginalKyuubiInstance(String originalKyuubiInstance) {
+    this.originalKyuubiInstance = originalKyuubiInstance;
+  }
+
+  public String getNewKyuubiInstance() {
+    return newKyuubiInstance;
+  }
+
+  public void setNewKyuubiInstance(String newKyuubiInstance) {
+    this.newKyuubiInstance = newKyuubiInstance;
+  }
+
+  @Override
+  public boolean equals(Object o) {
+    if (this == o) return true;
+    if (o == null || getClass() != o.getClass()) return false;
+    ReassignBatchResponse that = (ReassignBatchResponse) o;
+    return Objects.equals(getBatchIds(), that.getBatchIds())
+        && Objects.equals(getOriginalKyuubiInstance(), 
that.getOriginalKyuubiInstance())
+        && Objects.equals(getNewKyuubiInstance(), that.getNewKyuubiInstance());
+  }
+
+  @Override
+  public int hashCode() {
+    return Objects.hash(batchIds, originalKyuubiInstance, newKyuubiInstance);
+  }
+
+  @Override
+  public String toString() {
+    return ReflectionToStringBuilder.toString(this, ToStringStyle.JSON_STYLE);
+  }
+}
diff --git 
a/kyuubi-server/src/main/scala/org/apache/kyuubi/server/KyuubiRestFrontendService.scala
 
b/kyuubi-server/src/main/scala/org/apache/kyuubi/server/KyuubiRestFrontendService.scala
index 7fa111c7ff..787ac0b047 100644
--- 
a/kyuubi-server/src/main/scala/org/apache/kyuubi/server/KyuubiRestFrontendService.scala
+++ 
b/kyuubi-server/src/main/scala/org/apache/kyuubi/server/KyuubiRestFrontendService.scala
@@ -20,6 +20,7 @@ package org.apache.kyuubi.server
 import java.util.EnumSet
 import java.util.concurrent.{Future, TimeUnit}
 import java.util.concurrent.atomic.{AtomicBoolean, AtomicInteger}
+import java.util.concurrent.locks.ReentrantLock
 import javax.servlet.DispatcherType
 import javax.ws.rs.WebApplicationException
 import javax.ws.rs.core.Response.Status
@@ -167,8 +168,18 @@ class KyuubiRestFrontendService(override val serverable: 
Serverable)
       TimeUnit.MILLISECONDS)
   }
 
+  private val batchRecoveryLock: ReentrantLock = new ReentrantLock()
+  private def withBatchRecoveryLockRequired[T](block: => T): T = {
+    batchRecoveryLock.lock()
+    try {
+      block
+    } finally {
+      batchRecoveryLock.unlock()
+    }
+  }
+
   @VisibleForTesting
-  private[kyuubi] def recoverBatchSessions(): Unit = {
+  private[kyuubi] def recoverBatchSessions(): Unit = 
withBatchRecoveryLockRequired {
     val recoveryNumThreads = conf.get(METADATA_RECOVERY_THREADS)
     val batchRecoveryExecutor =
       ThreadUtils.newDaemonFixedThreadPool(recoveryNumThreads, 
"batch-recovery-executor")
@@ -206,6 +217,53 @@ class KyuubiRestFrontendService(override val serverable: 
Serverable)
     }
   }
 
+  private[kyuubi] def recoverBatchSessionsFromReassign(batchIds: Seq[String]): 
Seq[String] =
+    withBatchRecoveryLockRequired {
+      val recoveryNumThreads = conf.get(METADATA_RECOVERY_THREADS)
+      val batchRecoveryExecutor =
+        ThreadUtils.newDaemonFixedThreadPool(recoveryNumThreads, 
"batch-reassign-recovery-executor")
+      try {
+        val batchSessionsToRecover =
+          sessionManager.getSpecificBatchSessionsToRecover(batchIds, 
connectionUrl)
+        val pendingRecoveryTasksCount = new AtomicInteger(0)
+        val tasks = batchSessionsToRecover.flatMap { batchSession =>
+          val batchId = batchSession.batchJobSubmissionOp.batchId
+          try {
+            val task: Future[Unit] = batchRecoveryExecutor.submit(() =>
+              Utils.tryLogNonFatalError {
+                info(s"Recovering batch[$batchId] from reassign")
+                sessionManager.openBatchSession(batchSession)
+              })
+            Some(task -> batchId)
+          } catch {
+            case e: Throwable =>
+              error(s"Error while submitting batch[$batchId] for recovery", e)
+              None
+          }
+        }
+
+        pendingRecoveryTasksCount.addAndGet(tasks.size)
+
+        val finishedBatchIds: Seq[String] = tasks.flatMap { case (task, 
batchId) =>
+          try {
+            task.get()
+            val pendingTasks = pendingRecoveryTasksCount.decrementAndGet()
+            info(s"Batch[$batchId] recovery task terminated, current pending 
tasks $pendingTasks")
+            Some(batchId)
+          } catch {
+            case e: Throwable =>
+              error(s"Error while recovering batch[$batchId]", e)
+              val pendingTasks = pendingRecoveryTasksCount.decrementAndGet()
+              info(s"Batch[$batchId] recovery task terminated, current pending 
tasks $pendingTasks")
+              None
+          }
+        }
+        finishedBatchIds
+      } finally {
+        ThreadUtils.shutdown(batchRecoveryExecutor)
+      }
+    }
+
   private def getBatchPendingMaxElapse(): Long = {
     val batchPendingElapseTimes = sessionManager.allSessions().map {
       case session: KyuubiBatchSession => 
session.batchJobSubmissionOp.getPendingElapsedTime
diff --git 
a/kyuubi-server/src/main/scala/org/apache/kyuubi/server/api/v1/BatchesResource.scala
 
b/kyuubi-server/src/main/scala/org/apache/kyuubi/server/api/v1/BatchesResource.scala
index 4632527559..150b530dca 100644
--- 
a/kyuubi-server/src/main/scala/org/apache/kyuubi/server/api/v1/BatchesResource.scala
+++ 
b/kyuubi-server/src/main/scala/org/apache/kyuubi/server/api/v1/BatchesResource.scala
@@ -575,6 +575,42 @@ private[v1] class BatchesResource extends 
ApiRequestContext with Logging {
     }
   }
 
+  @ApiResponse(
+    responseCode = "200",
+    content = Array(new Content(
+      mediaType = MediaType.APPLICATION_JSON,
+      schema = new Schema(implementation = classOf[ReassignBatchResponse]))),
+    description =
+      "Reassign batch sessions on an unreachable kyuubi instance to the 
current kyuubi instance")
+  @POST
+  @Path("/reassign")
+  @Consumes(Array(MediaType.APPLICATION_JSON))
+  def reassignBatchSessions(request: ReassignBatchRequest): 
ReassignBatchResponse = {
+    val userName = fe.getSessionUser(Map.empty[String, String])
+    val ipAddress = fe.getIpAddress
+    val kyuubiInstance = request.getKyuubiInstance
+    val newKyuubiInstance = fe.connectionUrl
+    info(s"Received reassign $kyuubiInstance batch sessions request from 
$userName/$ipAddress")
+    if (!fe.isAdministrator(userName)) {
+      throw new ForbiddenException(s"$userName is not allowed to reassign the 
batches")
+    }
+    if (kyuubiInstance == newKyuubiInstance) {
+      throw new IllegalStateException(s"KyuubiInstance is alive: 
$kyuubiInstance")
+    }
+    val internalRestClient = getInternalRestClient(kyuubiInstance)
+    if (!Utils.isTesting && internalRestClient.pingAble()) {
+      throw new IllegalStateException(s"KyuubiInstance is alive: 
$kyuubiInstance")
+    }
+    try {
+      val batchIds = sessionManager.reassignBatchSessions(kyuubiInstance, 
newKyuubiInstance)
+      val recoveredBatchIds = fe.recoverBatchSessionsFromReassign(batchIds)
+      new ReassignBatchResponse(recoveredBatchIds.asJava, kyuubiInstance, 
newKyuubiInstance)
+    } catch {
+      case e: Exception =>
+        throw new Exception(s"Error reassign batches from $kyuubiInstance to 
$newKyuubiInstance", e)
+    }
+  }
+
   private def handleUploadingFiles(
       batchId: String,
       request: BatchRequest,
diff --git 
a/kyuubi-server/src/main/scala/org/apache/kyuubi/server/api/v1/InternalRestClient.scala
 
b/kyuubi-server/src/main/scala/org/apache/kyuubi/server/api/v1/InternalRestClient.scala
index e6f97efb5e..36dd633161 100644
--- 
a/kyuubi-server/src/main/scala/org/apache/kyuubi/server/api/v1/InternalRestClient.scala
+++ 
b/kyuubi-server/src/main/scala/org/apache/kyuubi/server/api/v1/InternalRestClient.scala
@@ -20,8 +20,9 @@ package org.apache.kyuubi.server.api.v1
 import java.util.Base64
 
 import scala.collection.JavaConverters._
+import scala.util.Try
 
-import org.apache.kyuubi.client.{BatchRestApi, KyuubiRestClient}
+import org.apache.kyuubi.client.{BaseRestApi, BatchRestApi, KyuubiRestClient}
 import org.apache.kyuubi.client.api.v1.dto.{Batch, CloseBatchResponse, 
OperationLog}
 import org.apache.kyuubi.client.auth.AuthHeaderGenerator
 import org.apache.kyuubi.server.http.authentication.AuthSchemes
@@ -53,6 +54,9 @@ class InternalRestClient(
   }
 
   private val internalBatchRestApi = new BatchRestApi(initKyuubiRestClient())
+  private val internalBaseRestApi = new BaseRestApi(initKyuubiRestClient())
+
+  def pingAble(): Boolean = Try(internalBaseRestApi.ping()).isSuccess
 
   def getBatch(user: String, clientIp: String, batchId: String): Batch = {
     withAuthUser(user) {
diff --git 
a/kyuubi-server/src/main/scala/org/apache/kyuubi/session/KyuubiBatchSession.scala
 
b/kyuubi-server/src/main/scala/org/apache/kyuubi/session/KyuubiBatchSession.scala
index ce8e4d0013..3fe518e0c1 100644
--- 
a/kyuubi-server/src/main/scala/org/apache/kyuubi/session/KyuubiBatchSession.scala
+++ 
b/kyuubi-server/src/main/scala/org/apache/kyuubi/session/KyuubiBatchSession.scala
@@ -23,7 +23,7 @@ import scala.collection.JavaConverters._
 
 import org.apache.kyuubi.client.util.BatchUtils._
 import org.apache.kyuubi.config.{KyuubiConf, KyuubiReservedKeys}
-import org.apache.kyuubi.config.KyuubiReservedKeys.KYUUBI_BATCH_PRIORITY
+import org.apache.kyuubi.config.KyuubiReservedKeys.{KYUUBI_BATCH_PRIORITY, 
KYUUBI_SESSION_CONNECTION_URL_KEY}
 import org.apache.kyuubi.engine.KyuubiApplicationManager
 import org.apache.kyuubi.engine.spark.SparkProcessBuilder
 import org.apache.kyuubi.events.{EventBus, KyuubiSessionEvent}
@@ -54,6 +54,8 @@ class KyuubiBatchSession(
     conf,
     sessionManager) {
   override val sessionType: SessionType = SessionType.BATCH
+  override val connectionUrl: String =
+    
metadata.map(_.kyuubiInstance).getOrElse(conf.getOrElse(KYUUBI_SESSION_CONNECTION_URL_KEY,
 ""))
 
   override val handle: SessionHandle = {
     val batchId = 
metadata.map(_.identifier).getOrElse(conf(KYUUBI_BATCH_ID_KEY))
diff --git 
a/kyuubi-server/src/main/scala/org/apache/kyuubi/session/KyuubiSessionManager.scala
 
b/kyuubi-server/src/main/scala/org/apache/kyuubi/session/KyuubiSessionManager.scala
index 9423521f6b..344da0e71e 100644
--- 
a/kyuubi-server/src/main/scala/org/apache/kyuubi/session/KyuubiSessionManager.scala
+++ 
b/kyuubi-server/src/main/scala/org/apache/kyuubi/session/KyuubiSessionManager.scala
@@ -320,18 +320,58 @@ class KyuubiSessionManager private (name: String) extends 
SessionManager(name) {
         kyuubiInstance,
         0,
         Int.MaxValue).map { metadata =>
-        createBatchSession(
-          metadata.username,
-          "anonymous",
-          metadata.ipAddress,
-          metadata.requestConf,
-          metadata.engineType,
-          Option(metadata.requestName),
-          metadata.resource,
-          metadata.className,
-          metadata.requestArgs,
-          Some(metadata),
-          fromRecovery = true)
+        createBatchSessionFromRecovery(metadata)
+      }).getOrElse(Seq.empty)
+    }
+  }
+
+  def getSpecificBatchSessionsToRecover(
+      batchIds: Seq[String],
+      kyuubiInstance: String): Seq[KyuubiBatchSession] = {
+    val batchStatesToRecovery = Set(OperationState.PENDING, 
OperationState.RUNNING)
+    batchIds.flatMap { batchId =>
+      getBatchSession(SessionHandle.fromUUID(batchId)) match {
+        case Some(_) =>
+          warn(s"Batch session $batchId is already active, skipping recovery.")
+          None
+        case None =>
+          getBatchMetadata(batchId)
+            .filter(m =>
+              m.kyuubiInstance == kyuubiInstance && 
batchStatesToRecovery.contains(m.opState))
+            .flatMap { metadata => 
Some(createBatchSessionFromRecovery(metadata)) }
+      }
+    }
+  }
+
+  private def createBatchSessionFromRecovery(metadata: Metadata): 
KyuubiBatchSession = {
+    createBatchSession(
+      metadata.username,
+      "anonymous",
+      metadata.ipAddress,
+      metadata.requestConf,
+      metadata.engineType,
+      Option(metadata.requestName),
+      metadata.resource,
+      metadata.className,
+      metadata.requestArgs,
+      Some(metadata),
+      fromRecovery = true)
+  }
+
+  def reassignBatchSessions(
+      kyuubiInstance: String,
+      newKyuubiInstance: String): Seq[String] = {
+    Seq(OperationState.PENDING, OperationState.RUNNING).flatMap { 
stateToRecover =>
+      metadataManager.map(_.getBatchesRecoveryMetadata(
+        stateToRecover.toString,
+        kyuubiInstance,
+        0,
+        Int.MaxValue).map { metadata =>
+        updateMetadata(Metadata(
+          identifier = metadata.identifier,
+          kyuubiInstance = newKyuubiInstance))
+        info(s"Reassign batch ${metadata.identifier} from $kyuubiInstance to 
$newKyuubiInstance")
+        metadata.identifier
       }).getOrElse(Seq.empty)
     }
   }
diff --git 
a/kyuubi-server/src/test/scala/org/apache/kyuubi/server/api/v1/BatchesResourceSuite.scala
 
b/kyuubi-server/src/test/scala/org/apache/kyuubi/server/api/v1/BatchesResourceSuite.scala
index 4d32de7026..f836f6f157 100644
--- 
a/kyuubi-server/src/test/scala/org/apache/kyuubi/server/api/v1/BatchesResourceSuite.scala
+++ 
b/kyuubi-server/src/test/scala/org/apache/kyuubi/server/api/v1/BatchesResourceSuite.scala
@@ -630,6 +630,105 @@ abstract class BatchesResourceSuiteBase extends 
KyuubiFunSuite
       Int.MaxValue).size === 2)
   }
 
+  test("reassign batch") {
+    val sessionManager = 
fe.be.sessionManager.asInstanceOf[KyuubiSessionManager]
+    val kyuubiInstance = fe.connectionUrl
+
+    assert(sessionManager.getActiveUserSessionCount === 0)
+    val batchId1 = UUID.randomUUID().toString
+    val batchId2 = UUID.randomUUID().toString
+
+    val batchMetadata = Metadata(
+      identifier = batchId1,
+      sessionType = SessionType.BATCH,
+      realUser = "kyuubi",
+      username = "kyuubi",
+      ipAddress = "localhost",
+      kyuubiInstance = "other_kyuubi_instance:10099",
+      state = OperationState.PENDING.toString,
+      resource = sparkBatchTestResource.get,
+      className = sparkBatchTestMainClass,
+      requestName = "PENDING_RECOVERY",
+      requestConf = Map("spark.master" -> "local"),
+      requestArgs = Seq.empty,
+      createTime = System.currentTimeMillis(),
+      engineType = "SPARK")
+
+    val batchMetadata2 = batchMetadata.copy(
+      identifier = batchId2,
+      requestName = "RUNNING_RECOVERY")
+    sessionManager.insertMetadata(batchMetadata)
+    sessionManager.insertMetadata(batchMetadata2)
+
+    
assert(sessionManager.getBatchFromMetadataStore(batchId1).map(_.getState).contains("PENDING"))
+    
assert(sessionManager.getBatchFromMetadataStore(batchId2).map(_.getState).contains("PENDING"))
+
+    val sparkBatchProcessBuilder = new SparkBatchProcessBuilder(
+      "kyuubi",
+      conf,
+      batchId2,
+      "RUNNING_RECOVERY",
+      sparkBatchTestResource,
+      sparkBatchTestMainClass,
+      batchMetadata2.requestConf,
+      batchMetadata2.requestArgs,
+      None)
+    sparkBatchProcessBuilder.start
+
+    var applicationStatus: Option[ApplicationInfo] = None
+    eventually(timeout(5.seconds)) {
+      applicationStatus =
+        
sessionManager.applicationManager.getApplicationInfo(ApplicationManagerInfo(None),
 batchId2)
+      assert(applicationStatus.isDefined)
+    }
+
+    val metadataToUpdate = Metadata(
+      identifier = batchId2,
+      state = OperationState.RUNNING.toString,
+      engineId = applicationStatus.get.id,
+      engineName = applicationStatus.get.name,
+      engineUrl = applicationStatus.get.url.orNull,
+      engineState = applicationStatus.get.state.toString,
+      engineError = applicationStatus.get.error)
+    sessionManager.updateMetadata(metadataToUpdate)
+
+    val requestObj = new ReassignBatchRequest("other_kyuubi_instance:10099")
+    val response = webTarget.path("api/v1/batches/reassign")
+      .request(MediaType.APPLICATION_JSON_TYPE)
+      .header(AUTHORIZATION_HEADER, 
basicAuthorizationHeader(Utils.currentUser))
+      .post(Entity.entity(requestObj, MediaType.APPLICATION_JSON_TYPE))
+    assert(response.getStatus === 200)
+    val batch = response.readEntity(classOf[ReassignBatchResponse])
+    assert(batch.getBatchIds.size() === 2)
+    assert(batch.getBatchIds.contains(batchId1))
+    assert(batch.getBatchIds.contains(batchId2))
+    
assert(sessionManager.getBatchMetadata(batchId1).map(_.kyuubiInstance).contains(kyuubiInstance))
+    
assert(sessionManager.getBatchMetadata(batchId2).map(_.kyuubiInstance).contains(kyuubiInstance))
+    assert(sessionManager.getActiveUserSessionCount === 2)
+
+    val sessionHandle1 = SessionHandle.fromUUID(batchId1)
+    val sessionHandle2 = SessionHandle.fromUUID(batchId2)
+    val session1 = 
sessionManager.getSession(sessionHandle1).asInstanceOf[KyuubiBatchSession]
+    val session2 = 
sessionManager.getSession(sessionHandle2).asInstanceOf[KyuubiBatchSession]
+    assert(session1.createTime === batchMetadata.createTime)
+    assert(session2.createTime === batchMetadata2.createTime)
+
+    eventually(timeout(10.seconds)) {
+      val batch1State = session1.batchJobSubmissionOp.getStatus.state
+      assert(batch1State === OperationState.RUNNING || 
OperationState.isTerminal(batch1State))
+      assert(session1.batchJobSubmissionOp.builder.processLaunched)
+
+      val batch2State = session2.batchJobSubmissionOp.getStatus.state
+      assert(batch2State === OperationState.RUNNING || 
OperationState.isTerminal(batch2State))
+      assert(!session2.batchJobSubmissionOp.builder.processLaunched)
+    }
+
+    assert(sessionManager.getBatchesFromMetadataStore(
+      MetadataFilter(engineType = "SPARK"),
+      0,
+      Int.MaxValue).size === 2)
+  }
+
   test("get local log internal redirection") {
     val sessionManager = 
fe.be.sessionManager.asInstanceOf[KyuubiSessionManager]
     val metadata = Metadata(
diff --git 
a/kyuubi-server/src/test/scala/org/apache/kyuubi/server/rest/client/BaseRestApiSuite.scala
 
b/kyuubi-server/src/test/scala/org/apache/kyuubi/server/rest/client/BaseRestApiSuite.scala
new file mode 100644
index 0000000000..15381f7c30
--- /dev/null
+++ 
b/kyuubi-server/src/test/scala/org/apache/kyuubi/server/rest/client/BaseRestApiSuite.scala
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kyuubi.server.rest.client
+
+import org.apache.kyuubi.{KYUUBI_VERSION, RestClientTestHelper}
+import org.apache.kyuubi.client.{BaseRestApi, KyuubiRestClient}
+
+class BaseRestApiSuite extends RestClientTestHelper {
+  test("base rest apis") {
+    val basicKyuubiRestClient: KyuubiRestClient =
+      KyuubiRestClient.builder(baseUri.toString)
+        .authHeaderMethod(KyuubiRestClient.AuthHeaderMethod.BASIC)
+        .username(ldapUser)
+        .password(ldapUserPasswd)
+        .socketTimeout(30000)
+        .build()
+    val baseRestApi: BaseRestApi = new BaseRestApi(basicKyuubiRestClient)
+
+    assert(baseRestApi.ping() == "pong")
+    assert(baseRestApi.getVersionInfo.getVersion == KYUUBI_VERSION)
+  }
+}

Reply via email to