This is an automated email from the ASF dual-hosted git repository.

ethanfeng pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/celeborn.git


The following commit(s) were added to refs/heads/main by this push:
     new c4c32995d [CELEBORN-1593][FOLLOWUP] Add CLI support for some missing 
HTTP APIs
c4c32995d is described below

commit c4c32995d3c0e401905b55923605ddd89d3a2fb7
Author: Aravind Patnam <[email protected]>
AuthorDate: Wed Sep 18 15:40:02 2024 +0800

    [CELEBORN-1593][FOLLOWUP] Add CLI support for some missing HTTP APIs
    
    ### What changes were proposed in this pull request?
    Adding support for `removeWorkersUnavailabeInfo`, 
`getDecommissioningWorkers` and `runIsDecommissioning`. This was missed in the 
last PR.
    
    ### Why are the changes needed?
    see above
    
    ### Does this PR introduce _any_ user-facing change?
    no
    
    ### How was this patch tested?
    added unit tests
    
    Closes #2735 from akpatnam25/CELEBORN-1593.
    
    Authored-by: Aravind Patnam <[email protected]>
    Signed-off-by: mingji <[email protected]>
---
 LICENSE-binary                                     |  1 +
 .../apache/celeborn/cli/master/MasterOptions.scala | 10 +++++++++
 .../celeborn/cli/master/MasterSubcommand.scala     |  4 ++++
 .../celeborn/cli/master/MasterSubcommandImpl.scala | 26 ++++++++++++++++++++--
 .../apache/celeborn/cli/worker/WorkerOptions.scala |  5 +++++
 .../celeborn/cli/worker/WorkerSubcommand.scala     |  2 ++
 .../celeborn/cli/worker/WorkerSubcommandImpl.scala |  3 +++
 .../celeborn/cli/TestCelebornCliCommands.scala     | 18 +++++++++++++++
 8 files changed, 67 insertions(+), 2 deletions(-)

diff --git a/LICENSE-binary b/LICENSE-binary
index 32ab78a7c..20d746b1b 100644
--- a/LICENSE-binary
+++ b/LICENSE-binary
@@ -221,6 +221,7 @@ commons-cli:commons-cli
 commons-io:commons-io
 commons-logging:commons-logging
 com.zaxxer:HikariCP
+info.picocli:picocli
 io.dropwizard.metrics:metrics-core
 io.dropwizard.metrics:metrics-graphite
 io.dropwizard.metrics:metrics-jvm
diff --git 
a/cli/src/main/scala/org/apache/celeborn/cli/master/MasterOptions.scala 
b/cli/src/main/scala/org/apache/celeborn/cli/master/MasterOptions.scala
index 95fda2028..a90d4e58c 100644
--- a/cli/src/main/scala/org/apache/celeborn/cli/master/MasterOptions.scala
+++ b/cli/src/main/scala/org/apache/celeborn/cli/master/MasterOptions.scala
@@ -64,6 +64,11 @@ final class MasterOptions {
   @Option(names = Array("--show-shutdown-workers"), description = Array("Show 
shutdown workers"))
   private[master] var showShutdownWorkers: Boolean = _
 
+  @Option(
+    names = Array("--show-decommissioning-workers"),
+    description = Array("Show decommissioning workers"))
+  private[master] var showDecommissioningWorkers: Boolean = _
+
   @Option(
     names = Array("--show-lifecycle-managers"),
     description = Array("Show lifecycle managers"))
@@ -92,4 +97,9 @@ final class MasterOptions {
     paramLabel = "alias",
     description = Array("Remove alias to use in the cli for the given set of 
masters"))
   private[master] var removeClusterAlias: String = _
+
+  @Option(
+    names = Array("--remove-workers-unavailable-info"),
+    description = Array("Remove the workers unavailable info from the 
master."))
+  private[master] var removeWorkersUnavailableInfo: Boolean = _
 }
diff --git 
a/cli/src/main/scala/org/apache/celeborn/cli/master/MasterSubcommand.scala 
b/cli/src/main/scala/org/apache/celeborn/cli/master/MasterSubcommand.scala
index 756ebb89d..cb67ca20e 100644
--- a/cli/src/main/scala/org/apache/celeborn/cli/master/MasterSubcommand.scala
+++ b/cli/src/main/scala/org/apache/celeborn/cli/master/MasterSubcommand.scala
@@ -80,6 +80,8 @@ trait MasterSubcommand extends CliLogging {
 
   private[master] def runRemoveExcludedWorkers: HandleResponse
 
+  private[master] def runRemoveWorkersUnavailableInfo: HandleResponse
+
   private[master] def runSendWorkerEvent: HandleResponse
 
   private[master] def runShowWorkerEventInfo: WorkerEventsResponse
@@ -90,6 +92,8 @@ trait MasterSubcommand extends CliLogging {
 
   private[master] def runShowShutdownWorkers: Seq[WorkerData]
 
+  private[master] def runShowDecommissioningWorkers: Seq[WorkerData]
+
   private[master] def runShowLifecycleManagers: HostnamesResponse
 
   private[master] def runShowWorkers: WorkersResponse
diff --git 
a/cli/src/main/scala/org/apache/celeborn/cli/master/MasterSubcommandImpl.scala 
b/cli/src/main/scala/org/apache/celeborn/cli/master/MasterSubcommandImpl.scala
index 984d3bee9..0b8a53c07 100644
--- 
a/cli/src/main/scala/org/apache/celeborn/cli/master/MasterSubcommandImpl.scala
+++ 
b/cli/src/main/scala/org/apache/celeborn/cli/master/MasterSubcommandImpl.scala
@@ -36,12 +36,14 @@ class MasterSubcommandImpl extends Runnable with 
MasterSubcommand {
     if (masterOptions.showTopDiskUsedApps) log(runShowTopDiskUsedApps)
     if (masterOptions.excludeWorkers) log(runExcludeWorkers)
     if (masterOptions.removeExcludedWorkers) log(runRemoveExcludedWorkers)
+    if (masterOptions.removeWorkersUnavailableInfo) 
log(runRemoveWorkersUnavailableInfo)
     if (masterOptions.sendWorkerEvent != null && 
masterOptions.sendWorkerEvent.nonEmpty)
       log(runSendWorkerEvent)
     if (masterOptions.showWorkerEventInfo) log(runShowWorkerEventInfo)
     if (masterOptions.showLostWorkers) log(runShowLostWorkers)
     if (masterOptions.showExcludedWorkers) log(runShowExcludedWorkers)
     if (masterOptions.showShutdownWorkers) log(runShowShutdownWorkers)
+    if (masterOptions.showDecommissioningWorkers) 
log(runShowDecommissioningWorkers)
     if (masterOptions.showLifecycleManagers) log(runShowLifecycleManagers)
     if (masterOptions.showWorkers) log(runShowWorkers)
     if (masterOptions.showConf) log(runShowConf)
@@ -66,17 +68,27 @@ class MasterSubcommandImpl extends Runnable with 
MasterSubcommand {
   private[master] def runExcludeWorkers: HandleResponse = {
     val workerIds = getWorkerIds
     val excludeWorkerRequest = new ExcludeWorkerRequest().add(workerIds)
-    logInfo(s"Sending exclude worker requests to workers: $workerIds")
+    logInfo(s"Sending exclude worker requests to master for the following 
workers: $workerIds")
     workerApi.excludeWorker(excludeWorkerRequest)
   }
 
   private[master] def runRemoveExcludedWorkers: HandleResponse = {
     val workerIds = getWorkerIds
     val removeExcludeWorkerRequest = new 
ExcludeWorkerRequest().remove(workerIds)
-    logInfo(s"Sending remove exclude worker requests to workers: $workerIds")
+    logInfo(
+      s"Sending remove exclude worker requests to master for the following 
workers: $workerIds")
     workerApi.excludeWorker(removeExcludeWorkerRequest)
   }
 
+  private[master] def runRemoveWorkersUnavailableInfo: HandleResponse = {
+    val workerIds = getWorkerIds
+    val removeWorkersUnavailableInfoRequest =
+      new RemoveWorkersUnavailableInfoRequest().workers(workerIds)
+    logInfo(
+      s"Sending remove workers unavailable info requests to master for the 
following workers: $workerIds")
+    workerApi.removeWorkersUnavailableInfo(removeWorkersUnavailableInfoRequest)
+  }
+
   private[master] def runSendWorkerEvent: HandleResponse = {
     val eventType = {
       try {
@@ -127,6 +139,16 @@ class MasterSubcommandImpl extends Runnable with 
MasterSubcommand {
     }
   }
 
+  private[master] def runShowDecommissioningWorkers: Seq[WorkerData] = {
+    val decommissioningWorkers = 
runShowWorkers.getDecommissioningWorkers.asScala.toSeq
+    if (decommissioningWorkers.isEmpty) {
+      log("No decommissioning workers found.")
+      Seq.empty[WorkerData]
+    } else {
+      decommissioningWorkers.sortBy(_.getHost)
+    }
+  }
+
   private[master] def runShowLifecycleManagers: HostnamesResponse =
     applicationApi.getApplicationHostNames
 
diff --git 
a/cli/src/main/scala/org/apache/celeborn/cli/worker/WorkerOptions.scala 
b/cli/src/main/scala/org/apache/celeborn/cli/worker/WorkerOptions.scala
index a0a1c25a7..717664a94 100644
--- a/cli/src/main/scala/org/apache/celeborn/cli/worker/WorkerOptions.scala
+++ b/cli/src/main/scala/org/apache/celeborn/cli/worker/WorkerOptions.scala
@@ -50,6 +50,11 @@ final class WorkerOptions {
   @Option(names = Array("--is-shutdown"), description = Array("Check if the 
system is shutdown"))
   private[worker] var isShutdown: Boolean = _
 
+  @Option(
+    names = Array("--is-decommissioning"),
+    description = Array("Check if the system is decommissioning"))
+  private[worker] var isDecommissioning: Boolean = _
+
   @Option(
     names = Array("--is-registered"),
     description = Array("Check if the system is registered"))
diff --git 
a/cli/src/main/scala/org/apache/celeborn/cli/worker/WorkerSubcommand.scala 
b/cli/src/main/scala/org/apache/celeborn/cli/worker/WorkerSubcommand.scala
index 7690c364e..517ec802c 100644
--- a/cli/src/main/scala/org/apache/celeborn/cli/worker/WorkerSubcommand.scala
+++ b/cli/src/main/scala/org/apache/celeborn/cli/worker/WorkerSubcommand.scala
@@ -70,6 +70,8 @@ trait WorkerSubcommand extends CliLogging {
 
   private[worker] def runIsShutdown: Boolean
 
+  private[worker] def runIsDecommissioning: Boolean
+
   private[worker] def runIsRegistered: Boolean
 
   private[worker] def runExit: HandleResponse
diff --git 
a/cli/src/main/scala/org/apache/celeborn/cli/worker/WorkerSubcommandImpl.scala 
b/cli/src/main/scala/org/apache/celeborn/cli/worker/WorkerSubcommandImpl.scala
index eee9e56dd..62289c35f 100644
--- 
a/cli/src/main/scala/org/apache/celeborn/cli/worker/WorkerSubcommandImpl.scala
+++ 
b/cli/src/main/scala/org/apache/celeborn/cli/worker/WorkerSubcommandImpl.scala
@@ -33,6 +33,7 @@ class WorkerSubcommandImpl extends Runnable with 
WorkerSubcommand {
     if (workerOptions.showPartitionLocationInfo) 
log(runShowPartitionLocationInfo)
     if (workerOptions.showUnavailablePeers) log(runShowUnavailablePeers)
     if (workerOptions.isShutdown) log(runIsShutdown)
+    if (workerOptions.isDecommissioning) log(runIsDecommissioning)
     if (workerOptions.isRegistered) log(runIsRegistered)
     if (workerOptions.exitType != null && workerOptions.exitType.nonEmpty) 
log(runExit)
     if (workerOptions.showConf) log(runShowConf)
@@ -57,6 +58,8 @@ class WorkerSubcommandImpl extends Runnable with 
WorkerSubcommand {
 
   private[worker] def runIsShutdown: Boolean = runShowWorkerInfo.getIsShutdown
 
+  private[worker] def runIsDecommissioning: Boolean = 
runShowWorkerInfo.getIsDecommissioning
+
   private[worker] def runIsRegistered: Boolean = 
runShowWorkerInfo.getIsRegistered
 
   private[worker] def runExit: HandleResponse = {
diff --git 
a/cli/src/test/scala/org/apache/celeborn/cli/TestCelebornCliCommands.scala 
b/cli/src/test/scala/org/apache/celeborn/cli/TestCelebornCliCommands.scala
index ea36e0ac8..b0ce832ec 100644
--- a/cli/src/test/scala/org/apache/celeborn/cli/TestCelebornCliCommands.scala
+++ b/cli/src/test/scala/org/apache/celeborn/cli/TestCelebornCliCommands.scala
@@ -104,6 +104,11 @@ class TestCelebornCliCommands extends CelebornFunSuite 
with MiniClusterFeature {
     captureOutputAndValidateResponse(args, "false")
   }
 
+  test("worker --is-decommissioning") {
+    val args = prepareWorkerArgs() :+ "--is-decommissioning"
+    captureOutputAndValidateResponse(args, "false")
+  }
+
   test("worker --is-registered") {
     val args = prepareWorkerArgs() :+ "--is-registered"
     captureOutputAndValidateResponse(args, "true")
@@ -166,6 +171,11 @@ class TestCelebornCliCommands extends CelebornFunSuite 
with MiniClusterFeature {
     captureOutputAndValidateResponse(args, "No shutdown workers found.")
   }
 
+  test("master --show-decommissioning-workers") {
+    val args = prepareMasterArgs() :+ "--show-decommissioning-workers"
+    captureOutputAndValidateResponse(args, "No decommissioning workers found.")
+  }
+
   test("master --show-lifecycle-managers") {
     val args = prepareMasterArgs() :+ "--show-lifecycle-managers"
     captureOutputAndValidateResponse(args, "HostnamesResponse")
@@ -214,6 +224,14 @@ class TestCelebornCliCommands extends CelebornFunSuite 
with MiniClusterFeature {
     captureOutputAndValidateResponse(args, "success: true")
   }
 
+  test("master --remove-workers-unavailable-info") {
+    val args = prepareMasterArgs() ++ Array(
+      "--remove-workers-unavailable-info",
+      "--worker-ids",
+      getWorkerId())
+    captureOutputAndValidateResponse(args, "success: true")
+  }
+
   private def prepareMasterArgs(): Array[String] = {
     Array(
       "master",

Reply via email to