This is an automated email from the ASF dual-hosted git repository.

rexxiong pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/celeborn.git


The following commit(s) were added to refs/heads/main by this push:
     new ae41cb5ad [CELEBORN-1537] Support to remove workers unavailable info 
with RESTful api
ae41cb5ad is described below

commit ae41cb5adeb7ed4527883cc5ee3f5e75b90043eb
Author: Wang, Fei <[email protected]>
AuthorDate: Mon Aug 19 11:10:33 2024 +0800

    [CELEBORN-1537] Support to remove workers unavailable info with RESTful api
    
    ### What changes were proposed in this pull request?
    In [CELEBORN-1535](https://issues.apache.org/jira/browse/CELEBORN-1535), we 
support to disable master workerUnavilableInfo expiration.
    
     In this PR,  a new RestAPI  introduced for manually remove unavailable 
workers. Then it can be used on demand.
    
    ### Why are the changes needed?
    To cleanup the works unavailable info on demand manually if we disable the 
expiration.
    
    ### Does this PR introduce _any_ user-facing change?
    Yes, a new RESTful API.
    
    ### How was this patch tested?
    UT.
    
    Closes #2658 from turboFei/support_cleanup.
    
    Authored-by: Wang, Fei <[email protected]>
    Signed-off-by: Shuang <[email protected]>
---
 common/src/main/proto/TransportMessages.proto      |   4 +
 .../common/protocol/message/ControlMessages.scala  |   7 ++
 .../celeborn/service/deploy/master/Master.scala    |  38 +++++--
 .../deploy/master/http/api/v1/WorkerResource.scala |  19 +++-
 .../http/api/v1/ApiV1MasterResourceSuite.scala     |   9 +-
 .../apache/celeborn/rest/v1/master/WorkerApi.java  |  70 ++++++++++++
 .../model/RemoveWorkersUnavailableInfoRequest.java | 120 +++++++++++++++++++++
 .../src/main/openapi3/master_rest_v1.yaml          |  28 +++++
 .../http/api/v1/ApiV1OpenapiClientSuite.scala      |   6 +-
 9 files changed, 286 insertions(+), 15 deletions(-)

diff --git a/common/src/main/proto/TransportMessages.proto 
b/common/src/main/proto/TransportMessages.proto
index d08feee4b..154da825c 100644
--- a/common/src/main/proto/TransportMessages.proto
+++ b/common/src/main/proto/TransportMessages.proto
@@ -522,6 +522,10 @@ message PbRemoveWorkersUnavailableInfo {
   string requestId = 2;
 }
 
+message PbRemoveWorkersUnavailableInfoResponse {
+  bool success = 1;
+}
+
 message PbWorkerExclude {
   repeated PbWorkerInfo workersToAdd = 1;
   repeated PbWorkerInfo workersToRemove = 2;
diff --git 
a/common/src/main/scala/org/apache/celeborn/common/protocol/message/ControlMessages.scala
 
b/common/src/main/scala/org/apache/celeborn/common/protocol/message/ControlMessages.scala
index b727e1b18..48d55405a 100644
--- 
a/common/src/main/scala/org/apache/celeborn/common/protocol/message/ControlMessages.scala
+++ 
b/common/src/main/scala/org/apache/celeborn/common/protocol/message/ControlMessages.scala
@@ -412,6 +412,13 @@ object ControlMessages extends Logging {
         .build()
   }
 
+  object RemoveWorkersUnavailableInfoResponse {
+    def apply(success: Boolean): PbRemoveWorkersUnavailableInfoResponse =
+      PbRemoveWorkersUnavailableInfoResponse.newBuilder()
+        .setSuccess(success)
+        .build()
+  }
+
   object WorkerEventRequest {
     def apply(
         workers: util.List[WorkerInfo],
diff --git 
a/master/src/main/scala/org/apache/celeborn/service/deploy/master/Master.scala 
b/master/src/main/scala/org/apache/celeborn/service/deploy/master/Master.scala
index 182ce03a7..4bfdc2996 100644
--- 
a/master/src/main/scala/org/apache/celeborn/service/deploy/master/Master.scala
+++ 
b/master/src/main/scala/org/apache/celeborn/service/deploy/master/Master.scala
@@ -389,12 +389,6 @@ private[celeborn] class Master(
       executeWithLeaderChecker(
         null,
         handleWorkerLost(null, host, rpcPort, pushPort, fetchPort, 
replicatePort, requestId))
-    case pb: PbRemoveWorkersUnavailableInfo =>
-      val unavailableWorkers = new 
util.ArrayList[WorkerInfo](pb.getWorkerInfoList
-        .asScala.map(PbSerDeUtils.fromPbWorkerInfo).toList.asJava)
-      executeWithLeaderChecker(
-        null,
-        handleRemoveWorkersUnavailableInfos(unavailableWorkers, 
pb.getRequestId))
   }
 
   override def receiveAndReply(context: RpcCallContext): PartialFunction[Any, 
Unit] = {
@@ -562,6 +556,13 @@ private[celeborn] class Master(
     case pb: PbApplicationMetaRequest =>
       // This request is from a worker
       executeWithLeaderChecker(context, 
handleRequestForApplicationMeta(context, pb))
+
+    case pb: PbRemoveWorkersUnavailableInfo =>
+      val unavailableWorkers = new 
util.ArrayList[WorkerInfo](pb.getWorkerInfoList
+        .asScala.map(PbSerDeUtils.fromPbWorkerInfo).toList.asJava)
+      executeWithLeaderChecker(
+        context,
+        handleRemoveWorkersUnavailableInfos(context, unavailableWorkers, 
pb.getRequestId))
   }
 
   private def timeoutDeadWorkers(): Unit = {
@@ -596,13 +597,11 @@ private[celeborn] class Master(
 
     val unavailableInfoTimeoutWorkers = 
statusSystem.lostWorkers.asScala.filter {
       case (_, lostTime) => currentTime - lostTime > 
workerUnavailableInfoExpireTimeoutMs
-    }.keySet.toList.asJava
+    }.keySet.toSeq
 
     if (!unavailableInfoTimeoutWorkers.isEmpty) {
-      logDebug(s"Remove unavailable info for workers: 
$unavailableInfoTimeoutWorkers")
-      self.send(RemoveWorkersUnavailableInfo(
-        unavailableInfoTimeoutWorkers,
-        MasterClient.genRequestId()))
+      val handleResponse = 
removeWorkersUnavailableInfo(unavailableInfoTimeoutWorkers)
+      logDebug(s"Remove unavailable info for workers response: 
$handleResponse")
     }
   }
 
@@ -1081,9 +1080,13 @@ private[celeborn] class Master(
   }
 
   private def handleRemoveWorkersUnavailableInfos(
+      context: RpcCallContext,
       unavailableWorkers: util.List[WorkerInfo],
       requestId: String): Unit = {
     statusSystem.handleRemoveWorkersUnavailableInfo(unavailableWorkers, 
requestId)
+    if (context != null) {
+      context.reply(RemoveWorkersUnavailableInfoResponse(true))
+    }
   }
 
   private def handleResourceConsumption(userIdentifier: UserIdentifier): 
ResourceConsumption = {
@@ -1343,6 +1346,19 @@ private[celeborn] class Master(
     workerExcludeResponse.getSuccess -> sb.toString()
   }
 
+  def removeWorkersUnavailableInfo(unavailableWorkers: Seq[WorkerInfo]): 
HandleResponse = {
+    val removeWorkersUnavailableInfoResponse =
+      
self.askSync[PbRemoveWorkersUnavailableInfoResponse](RemoveWorkersUnavailableInfo(
+        unavailableWorkers.asJava,
+        MasterClient.genRequestId()))
+    if (removeWorkersUnavailableInfoResponse.getSuccess) {
+      true -> s"Remove unavailable info for workers 
${unavailableWorkers.map(_.readableAddress).mkString(",")} successfully."
+    } else {
+      false -> s"Failed to remove unavailable info for workers 
${unavailableWorkers.map(
+        _.readableAddress).mkString(",")}."
+    }
+  }
+
   private def isMasterActive: Int = {
     // use int rather than bool for better monitoring on dashboard
     val isActive =
diff --git 
a/master/src/main/scala/org/apache/celeborn/service/deploy/master/http/api/v1/WorkerResource.scala
 
b/master/src/main/scala/org/apache/celeborn/service/deploy/master/http/api/v1/WorkerResource.scala
index 92e5a9dad..7e376727a 100644
--- 
a/master/src/main/scala/org/apache/celeborn/service/deploy/master/http/api/v1/WorkerResource.scala
+++ 
b/master/src/main/scala/org/apache/celeborn/service/deploy/master/http/api/v1/WorkerResource.scala
@@ -26,7 +26,7 @@ import io.swagger.v3.oas.annotations.media.{Content, Schema}
 import io.swagger.v3.oas.annotations.responses.ApiResponse
 import io.swagger.v3.oas.annotations.tags.Tag
 
-import org.apache.celeborn.rest.v1.model.{ExcludeWorkerRequest, 
HandleResponse, SendWorkerEventRequest, WorkerEventData, WorkerEventInfoData, 
WorkerEventsResponse, WorkersResponse, WorkerTimestampData}
+import org.apache.celeborn.rest.v1.model.{ExcludeWorkerRequest, 
HandleResponse, RemoveWorkersUnavailableInfoRequest, SendWorkerEventRequest, 
WorkerEventData, WorkerEventInfoData, WorkerEventsResponse, WorkersResponse, 
WorkerTimestampData}
 import org.apache.celeborn.server.common.http.api.ApiRequestContext
 import org.apache.celeborn.server.common.http.api.v1.ApiUtils
 import org.apache.celeborn.service.deploy.master.Master
@@ -35,7 +35,8 @@ import org.apache.celeborn.service.deploy.master.Master
 @Produces(Array(MediaType.APPLICATION_JSON))
 @Consumes(Array(MediaType.APPLICATION_JSON))
 class WorkerResource extends ApiRequestContext {
-  private def statusSystem = httpService.asInstanceOf[Master].statusSystem
+  private def master: Master = httpService.asInstanceOf[Master]
+  private def statusSystem = master.statusSystem
 
   @ApiResponse(
     responseCode = "200",
@@ -78,6 +79,20 @@ class WorkerResource extends ApiRequestContext {
     new HandleResponse().success(success).message(msg)
   }
 
+  @ApiResponse(
+    responseCode = "200",
+    content = Array(new Content(
+      mediaType = MediaType.APPLICATION_JSON,
+      schema = new Schema(implementation = classOf[HandleResponse]))),
+    description = "Remove the workers unavailable info from the master.")
+  @POST
+  @Path("/remove_unavailable")
+  def removeWorkersUnavailableInfo(request: 
RemoveWorkersUnavailableInfoRequest): HandleResponse = {
+    val (success, msg) = master.removeWorkersUnavailableInfo(
+      request.getWorkers.asScala.map(ApiUtils.toWorkerInfo).toSeq)
+    new HandleResponse().success(success).message(msg)
+  }
+
   @ApiResponse(
     responseCode = "200",
     content = Array(new Content(
diff --git 
a/master/src/test/scala/org/apache/celeborn/service/deploy/master/http/api/v1/ApiV1MasterResourceSuite.scala
 
b/master/src/test/scala/org/apache/celeborn/service/deploy/master/http/api/v1/ApiV1MasterResourceSuite.scala
index 74ca872c3..388b26b2e 100644
--- 
a/master/src/test/scala/org/apache/celeborn/service/deploy/master/http/api/v1/ApiV1MasterResourceSuite.scala
+++ 
b/master/src/test/scala/org/apache/celeborn/service/deploy/master/http/api/v1/ApiV1MasterResourceSuite.scala
@@ -26,7 +26,7 @@ import com.google.common.io.Files
 
 import org.apache.celeborn.common.CelebornConf
 import org.apache.celeborn.common.util.{CelebornExitKind, Utils}
-import org.apache.celeborn.rest.v1.model.{AppDiskUsageSnapshotsResponse, 
ApplicationsResponse, ExcludeWorkerRequest, HandleResponse, HostnamesResponse, 
SendWorkerEventRequest, ShufflesResponse, WorkerEventsResponse, WorkerId, 
WorkersResponse}
+import org.apache.celeborn.rest.v1.model.{AppDiskUsageSnapshotsResponse, 
ApplicationsResponse, ExcludeWorkerRequest, HandleResponse, HostnamesResponse, 
RemoveWorkersUnavailableInfoRequest, SendWorkerEventRequest, ShufflesResponse, 
WorkerEventsResponse, WorkerId, WorkersResponse}
 import org.apache.celeborn.server.common.HttpService
 import org.apache.celeborn.server.common.http.api.v1.ApiV1BaseResourceSuite
 import org.apache.celeborn.service.deploy.master.{Master, MasterArguments}
@@ -122,6 +122,13 @@ class ApiV1MasterResourceSuite extends 
ApiV1BaseResourceSuite {
     assert(response.readEntity(classOf[HandleResponse]).getMessage.contains(
       "Unknown workers Host:unknown.celeborn"))
 
+    val removeWorkersUnavailableInfoRequest = new 
RemoveWorkersUnavailableInfoRequest()
+      .workers(Collections.singletonList(worker))
+    response =
+      
webTarget.path("workers/remove_unavailable").request(MediaType.APPLICATION_JSON).post(
+        Entity.entity(removeWorkersUnavailableInfoRequest, 
MediaType.APPLICATION_JSON))
+    assert(HttpServletResponse.SC_OK == response.getStatus)
+
     response = 
webTarget.path("workers/events").request(MediaType.APPLICATION_JSON).get()
     assert(HttpServletResponse.SC_OK == response.getStatus)
     
assert(response.readEntity(classOf[WorkerEventsResponse]).getWorkerEvents.isEmpty)
diff --git 
a/openapi/openapi-client/src/main/java/org/apache/celeborn/rest/v1/master/WorkerApi.java
 
b/openapi/openapi-client/src/main/java/org/apache/celeborn/rest/v1/master/WorkerApi.java
index ac5a861bc..c8e237314 100644
--- 
a/openapi/openapi-client/src/main/java/org/apache/celeborn/rest/v1/master/WorkerApi.java
+++ 
b/openapi/openapi-client/src/main/java/org/apache/celeborn/rest/v1/master/WorkerApi.java
@@ -27,6 +27,7 @@ import org.apache.celeborn.rest.v1.master.invoker.Pair;
 
 import org.apache.celeborn.rest.v1.model.ExcludeWorkerRequest;
 import org.apache.celeborn.rest.v1.model.HandleResponse;
+import org.apache.celeborn.rest.v1.model.RemoveWorkersUnavailableInfoRequest;
 import org.apache.celeborn.rest.v1.model.SendWorkerEventRequest;
 import org.apache.celeborn.rest.v1.model.WorkerEventsResponse;
 import org.apache.celeborn.rest.v1.model.WorkersResponse;
@@ -253,6 +254,75 @@ public class WorkerApi extends BaseApi {
     );
   }
 
+  /**
+   * 
+   * Remove the workers unavailable info from the master.
+   * @param removeWorkersUnavailableInfoRequest  (optional)
+   * @return HandleResponse
+   * @throws ApiException if fails to make API call
+   */
+  public HandleResponse 
removeWorkersUnavailableInfo(RemoveWorkersUnavailableInfoRequest 
removeWorkersUnavailableInfoRequest) throws ApiException {
+    return 
this.removeWorkersUnavailableInfo(removeWorkersUnavailableInfoRequest, 
Collections.emptyMap());
+  }
+
+
+  /**
+   * 
+   * Remove the workers unavailable info from the master.
+   * @param removeWorkersUnavailableInfoRequest  (optional)
+   * @param additionalHeaders additionalHeaders for this call
+   * @return HandleResponse
+   * @throws ApiException if fails to make API call
+   */
+  public HandleResponse 
removeWorkersUnavailableInfo(RemoveWorkersUnavailableInfoRequest 
removeWorkersUnavailableInfoRequest, Map<String, String> additionalHeaders) 
throws ApiException {
+    Object localVarPostBody = removeWorkersUnavailableInfoRequest;
+    
+    // create path and map variables
+    String localVarPath = "/api/v1/workers/remove_unavailable";
+
+    StringJoiner localVarQueryStringJoiner = new StringJoiner("&");
+    String localVarQueryParameterBaseName;
+    List<Pair> localVarQueryParams = new ArrayList<Pair>();
+    List<Pair> localVarCollectionQueryParams = new ArrayList<Pair>();
+    Map<String, String> localVarHeaderParams = new HashMap<String, String>();
+    Map<String, String> localVarCookieParams = new HashMap<String, String>();
+    Map<String, Object> localVarFormParams = new HashMap<String, Object>();
+
+    
+    localVarHeaderParams.putAll(additionalHeaders);
+
+    
+    
+    final String[] localVarAccepts = {
+      "application/json"
+    };
+    final String localVarAccept = 
apiClient.selectHeaderAccept(localVarAccepts);
+
+    final String[] localVarContentTypes = {
+      "application/json"
+    };
+    final String localVarContentType = 
apiClient.selectHeaderContentType(localVarContentTypes);
+
+    String[] localVarAuthNames = new String[] { "basic" };
+
+    TypeReference<HandleResponse> localVarReturnType = new 
TypeReference<HandleResponse>() {};
+    return apiClient.invokeAPI(
+        localVarPath,
+        "POST",
+        localVarQueryParams,
+        localVarCollectionQueryParams,
+        localVarQueryStringJoiner.toString(),
+        localVarPostBody,
+        localVarHeaderParams,
+        localVarCookieParams,
+        localVarFormParams,
+        localVarAccept,
+        localVarContentType,
+        localVarAuthNames,
+        localVarReturnType
+    );
+  }
+
   /**
    * 
    * For Master(Leader) can send worker event to manager workers. Legal types 
are &#39;None&#39;, &#39;Immediately&#39;, &#39;Decommission&#39;, 
&#39;DecommissionThenIdle&#39;, &#39;Graceful&#39;, &#39;Recommission&#39;. 
diff --git 
a/openapi/openapi-client/src/main/java/org/apache/celeborn/rest/v1/model/RemoveWorkersUnavailableInfoRequest.java
 
b/openapi/openapi-client/src/main/java/org/apache/celeborn/rest/v1/model/RemoveWorkersUnavailableInfoRequest.java
new file mode 100644
index 000000000..3db053928
--- /dev/null
+++ 
b/openapi/openapi-client/src/main/java/org/apache/celeborn/rest/v1/model/RemoveWorkersUnavailableInfoRequest.java
@@ -0,0 +1,120 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.celeborn.rest.v1.model;
+
+import java.util.Objects;
+import java.util.Arrays;
+import com.fasterxml.jackson.annotation.JsonInclude;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonTypeName;
+import com.fasterxml.jackson.annotation.JsonValue;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import org.apache.celeborn.rest.v1.model.WorkerId;
+import com.fasterxml.jackson.annotation.JsonPropertyOrder;
+import com.fasterxml.jackson.annotation.JsonTypeName;
+
+/**
+ * RemoveWorkersUnavailableInfoRequest
+ */
+@JsonPropertyOrder({
+  RemoveWorkersUnavailableInfoRequest.JSON_PROPERTY_WORKERS
+})
[email protected](value = 
"org.openapitools.codegen.languages.JavaClientCodegen", comments = "Generator 
version: 7.7.0")
+public class RemoveWorkersUnavailableInfoRequest {
+  public static final String JSON_PROPERTY_WORKERS = "workers";
+  private List<WorkerId> workers = new ArrayList<>();
+
+  public RemoveWorkersUnavailableInfoRequest() {
+  }
+
+  public RemoveWorkersUnavailableInfoRequest workers(List<WorkerId> workers) {
+    
+    this.workers = workers;
+    return this;
+  }
+
+  public RemoveWorkersUnavailableInfoRequest addWorkersItem(WorkerId 
workersItem) {
+    if (this.workers == null) {
+      this.workers = new ArrayList<>();
+    }
+    this.workers.add(workersItem);
+    return this;
+  }
+
+  /**
+   * The workers to be removed from the master workers unavailable info.
+   * @return workers
+   */
+  @javax.annotation.Nullable
+  @JsonProperty(JSON_PROPERTY_WORKERS)
+  @JsonInclude(value = JsonInclude.Include.USE_DEFAULTS)
+
+  public List<WorkerId> getWorkers() {
+    return workers;
+  }
+
+
+  @JsonProperty(JSON_PROPERTY_WORKERS)
+  @JsonInclude(value = JsonInclude.Include.USE_DEFAULTS)
+  public void setWorkers(List<WorkerId> workers) {
+    this.workers = workers;
+  }
+
+  @Override
+  public boolean equals(Object o) {
+    if (this == o) {
+      return true;
+    }
+    if (o == null || getClass() != o.getClass()) {
+      return false;
+    }
+    RemoveWorkersUnavailableInfoRequest removeWorkersUnavailableInfoRequest = 
(RemoveWorkersUnavailableInfoRequest) o;
+    return Objects.equals(this.workers, 
removeWorkersUnavailableInfoRequest.workers);
+  }
+
+  @Override
+  public int hashCode() {
+    return Objects.hash(workers);
+  }
+
+  @Override
+  public String toString() {
+    StringBuilder sb = new StringBuilder();
+    sb.append("class RemoveWorkersUnavailableInfoRequest {\n");
+    sb.append("    workers: ").append(toIndentedString(workers)).append("\n");
+    sb.append("}");
+    return sb.toString();
+  }
+
+  /**
+   * Convert the given object to string with each line indented by 4 spaces
+   * (except the first line).
+   */
+  private String toIndentedString(Object o) {
+    if (o == null) {
+      return "null";
+    }
+    return o.toString().replace("\n", "\n    ");
+  }
+
+}
+
diff --git a/openapi/openapi-client/src/main/openapi3/master_rest_v1.yaml 
b/openapi/openapi-client/src/main/openapi3/master_rest_v1.yaml
index 73a860913..57fc5d472 100644
--- a/openapi/openapi-client/src/main/openapi3/master_rest_v1.yaml
+++ b/openapi/openapi-client/src/main/openapi3/master_rest_v1.yaml
@@ -162,6 +162,25 @@ paths:
               schema:
                 $ref: '#/components/schemas/HandleResponse'
 
+  /api/v1/workers/remove_unavailable:
+    post:
+      tags:
+        - Worker
+      operationId: removeWorkersUnavailableInfo
+      description: Remove the workers unavailable info from the master.
+      requestBody:
+        content:
+          application/json:
+            schema:
+              $ref: '#/components/schemas/RemoveWorkersUnavailableInfoRequest'
+      responses:
+        "200":
+          description: The request was successful.
+          content:
+            application/json:
+              schema:
+                $ref: '#/components/schemas/HandleResponse'
+
   /api/v1/workers/events:
     get:
       tags:
@@ -671,6 +690,15 @@ components:
           items:
             $ref: '#/components/schemas/WorkerId'
 
+    RemoveWorkersUnavailableInfoRequest:
+      type: object
+      properties:
+        workers:
+          type: array
+          description: The workers to be removed from the master workers 
unavailable info.
+          items:
+            $ref: '#/components/schemas/WorkerId'
+
     SendWorkerEventRequest:
       type: object
       properties:
diff --git 
a/worker/src/test/scala/org/apache/celeborn/service/deploy/worker/http/api/v1/ApiV1OpenapiClientSuite.scala
 
b/worker/src/test/scala/org/apache/celeborn/service/deploy/worker/http/api/v1/ApiV1OpenapiClientSuite.scala
index 2e077f10e..e5368cf6d 100644
--- 
a/worker/src/test/scala/org/apache/celeborn/service/deploy/worker/http/api/v1/ApiV1OpenapiClientSuite.scala
+++ 
b/worker/src/test/scala/org/apache/celeborn/service/deploy/worker/http/api/v1/ApiV1OpenapiClientSuite.scala
@@ -22,7 +22,7 @@ import javax.servlet.http.HttpServletResponse
 
 import org.apache.celeborn.rest.v1.master._
 import org.apache.celeborn.rest.v1.master.invoker._
-import org.apache.celeborn.rest.v1.model.{ExcludeWorkerRequest, 
SendWorkerEventRequest, WorkerId}
+import org.apache.celeborn.rest.v1.model.{ExcludeWorkerRequest, 
RemoveWorkersUnavailableInfoRequest, SendWorkerEventRequest, WorkerId}
 import org.apache.celeborn.rest.v1.model.SendWorkerEventRequest.EventTypeEnum
 
 class ApiV1OpenapiClientSuite extends ApiV1WorkerOpenapiClientSuite {
@@ -95,6 +95,10 @@ class ApiV1OpenapiClientSuite extends 
ApiV1WorkerOpenapiClientSuite {
       new 
ExcludeWorkerRequest().addRemoveItem(workerId).add(Collections.emptyList()))
     assert(handleResponse.getSuccess)
 
+    handleResponse = api.removeWorkersUnavailableInfo(
+      new RemoveWorkersUnavailableInfoRequest().addWorkersItem(workerId));
+    assert(handleResponse.getSuccess)
+
     workersResponse = api.getWorkers
     assert(!workersResponse.getWorkers.isEmpty)
     assert(workersResponse.getExcludedWorkers.isEmpty)

Reply via email to