This is an automated email from the ASF dual-hosted git repository.
ethanfeng pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/celeborn.git
The following commit(s) were added to refs/heads/main by this push:
new c4c32995d [CELEBORN-1593][FOLLOWUP] Add CLI support for some missing
HTTP APIs
c4c32995d is described below
commit c4c32995d3c0e401905b55923605ddd89d3a2fb7
Author: Aravind Patnam <[email protected]>
AuthorDate: Wed Sep 18 15:40:02 2024 +0800
[CELEBORN-1593][FOLLOWUP] Add CLI support for some missing HTTP APIs
### What changes were proposed in this pull request?
Adding support for `removeWorkersUnavailabeInfo`,
`getDecommissioningWorkers` and `runIsDecommissioning`. This was missed in the
last PR.
### Why are the changes needed?
see above
### Does this PR introduce _any_ user-facing change?
no
### How was this patch tested?
added unit tests
Closes #2735 from akpatnam25/CELEBORN-1593.
Authored-by: Aravind Patnam <[email protected]>
Signed-off-by: mingji <[email protected]>
---
LICENSE-binary | 1 +
.../apache/celeborn/cli/master/MasterOptions.scala | 10 +++++++++
.../celeborn/cli/master/MasterSubcommand.scala | 4 ++++
.../celeborn/cli/master/MasterSubcommandImpl.scala | 26 ++++++++++++++++++++--
.../apache/celeborn/cli/worker/WorkerOptions.scala | 5 +++++
.../celeborn/cli/worker/WorkerSubcommand.scala | 2 ++
.../celeborn/cli/worker/WorkerSubcommandImpl.scala | 3 +++
.../celeborn/cli/TestCelebornCliCommands.scala | 18 +++++++++++++++
8 files changed, 67 insertions(+), 2 deletions(-)
diff --git a/LICENSE-binary b/LICENSE-binary
index 32ab78a7c..20d746b1b 100644
--- a/LICENSE-binary
+++ b/LICENSE-binary
@@ -221,6 +221,7 @@ commons-cli:commons-cli
commons-io:commons-io
commons-logging:commons-logging
com.zaxxer:HikariCP
+info.picocli:picocli
io.dropwizard.metrics:metrics-core
io.dropwizard.metrics:metrics-graphite
io.dropwizard.metrics:metrics-jvm
diff --git
a/cli/src/main/scala/org/apache/celeborn/cli/master/MasterOptions.scala
b/cli/src/main/scala/org/apache/celeborn/cli/master/MasterOptions.scala
index 95fda2028..a90d4e58c 100644
--- a/cli/src/main/scala/org/apache/celeborn/cli/master/MasterOptions.scala
+++ b/cli/src/main/scala/org/apache/celeborn/cli/master/MasterOptions.scala
@@ -64,6 +64,11 @@ final class MasterOptions {
@Option(names = Array("--show-shutdown-workers"), description = Array("Show
shutdown workers"))
private[master] var showShutdownWorkers: Boolean = _
+ @Option(
+ names = Array("--show-decommissioning-workers"),
+ description = Array("Show decommissioning workers"))
+ private[master] var showDecommissioningWorkers: Boolean = _
+
@Option(
names = Array("--show-lifecycle-managers"),
description = Array("Show lifecycle managers"))
@@ -92,4 +97,9 @@ final class MasterOptions {
paramLabel = "alias",
description = Array("Remove alias to use in the cli for the given set of
masters"))
private[master] var removeClusterAlias: String = _
+
+ @Option(
+ names = Array("--remove-workers-unavailable-info"),
+ description = Array("Remove the workers unavailable info from the
master."))
+ private[master] var removeWorkersUnavailableInfo: Boolean = _
}
diff --git
a/cli/src/main/scala/org/apache/celeborn/cli/master/MasterSubcommand.scala
b/cli/src/main/scala/org/apache/celeborn/cli/master/MasterSubcommand.scala
index 756ebb89d..cb67ca20e 100644
--- a/cli/src/main/scala/org/apache/celeborn/cli/master/MasterSubcommand.scala
+++ b/cli/src/main/scala/org/apache/celeborn/cli/master/MasterSubcommand.scala
@@ -80,6 +80,8 @@ trait MasterSubcommand extends CliLogging {
private[master] def runRemoveExcludedWorkers: HandleResponse
+ private[master] def runRemoveWorkersUnavailableInfo: HandleResponse
+
private[master] def runSendWorkerEvent: HandleResponse
private[master] def runShowWorkerEventInfo: WorkerEventsResponse
@@ -90,6 +92,8 @@ trait MasterSubcommand extends CliLogging {
private[master] def runShowShutdownWorkers: Seq[WorkerData]
+ private[master] def runShowDecommissioningWorkers: Seq[WorkerData]
+
private[master] def runShowLifecycleManagers: HostnamesResponse
private[master] def runShowWorkers: WorkersResponse
diff --git
a/cli/src/main/scala/org/apache/celeborn/cli/master/MasterSubcommandImpl.scala
b/cli/src/main/scala/org/apache/celeborn/cli/master/MasterSubcommandImpl.scala
index 984d3bee9..0b8a53c07 100644
---
a/cli/src/main/scala/org/apache/celeborn/cli/master/MasterSubcommandImpl.scala
+++
b/cli/src/main/scala/org/apache/celeborn/cli/master/MasterSubcommandImpl.scala
@@ -36,12 +36,14 @@ class MasterSubcommandImpl extends Runnable with
MasterSubcommand {
if (masterOptions.showTopDiskUsedApps) log(runShowTopDiskUsedApps)
if (masterOptions.excludeWorkers) log(runExcludeWorkers)
if (masterOptions.removeExcludedWorkers) log(runRemoveExcludedWorkers)
+ if (masterOptions.removeWorkersUnavailableInfo)
log(runRemoveWorkersUnavailableInfo)
if (masterOptions.sendWorkerEvent != null &&
masterOptions.sendWorkerEvent.nonEmpty)
log(runSendWorkerEvent)
if (masterOptions.showWorkerEventInfo) log(runShowWorkerEventInfo)
if (masterOptions.showLostWorkers) log(runShowLostWorkers)
if (masterOptions.showExcludedWorkers) log(runShowExcludedWorkers)
if (masterOptions.showShutdownWorkers) log(runShowShutdownWorkers)
+ if (masterOptions.showDecommissioningWorkers)
log(runShowDecommissioningWorkers)
if (masterOptions.showLifecycleManagers) log(runShowLifecycleManagers)
if (masterOptions.showWorkers) log(runShowWorkers)
if (masterOptions.showConf) log(runShowConf)
@@ -66,17 +68,27 @@ class MasterSubcommandImpl extends Runnable with
MasterSubcommand {
private[master] def runExcludeWorkers: HandleResponse = {
val workerIds = getWorkerIds
val excludeWorkerRequest = new ExcludeWorkerRequest().add(workerIds)
- logInfo(s"Sending exclude worker requests to workers: $workerIds")
+ logInfo(s"Sending exclude worker requests to master for the following
workers: $workerIds")
workerApi.excludeWorker(excludeWorkerRequest)
}
private[master] def runRemoveExcludedWorkers: HandleResponse = {
val workerIds = getWorkerIds
val removeExcludeWorkerRequest = new
ExcludeWorkerRequest().remove(workerIds)
- logInfo(s"Sending remove exclude worker requests to workers: $workerIds")
+ logInfo(
+ s"Sending remove exclude worker requests to master for the following
workers: $workerIds")
workerApi.excludeWorker(removeExcludeWorkerRequest)
}
+ private[master] def runRemoveWorkersUnavailableInfo: HandleResponse = {
+ val workerIds = getWorkerIds
+ val removeWorkersUnavailableInfoRequest =
+ new RemoveWorkersUnavailableInfoRequest().workers(workerIds)
+ logInfo(
+ s"Sending remove workers unavailable info requests to master for the
following workers: $workerIds")
+ workerApi.removeWorkersUnavailableInfo(removeWorkersUnavailableInfoRequest)
+ }
+
private[master] def runSendWorkerEvent: HandleResponse = {
val eventType = {
try {
@@ -127,6 +139,16 @@ class MasterSubcommandImpl extends Runnable with
MasterSubcommand {
}
}
+ private[master] def runShowDecommissioningWorkers: Seq[WorkerData] = {
+ val decommissioningWorkers =
runShowWorkers.getDecommissioningWorkers.asScala.toSeq
+ if (decommissioningWorkers.isEmpty) {
+ log("No decommissioning workers found.")
+ Seq.empty[WorkerData]
+ } else {
+ decommissioningWorkers.sortBy(_.getHost)
+ }
+ }
+
private[master] def runShowLifecycleManagers: HostnamesResponse =
applicationApi.getApplicationHostNames
diff --git
a/cli/src/main/scala/org/apache/celeborn/cli/worker/WorkerOptions.scala
b/cli/src/main/scala/org/apache/celeborn/cli/worker/WorkerOptions.scala
index a0a1c25a7..717664a94 100644
--- a/cli/src/main/scala/org/apache/celeborn/cli/worker/WorkerOptions.scala
+++ b/cli/src/main/scala/org/apache/celeborn/cli/worker/WorkerOptions.scala
@@ -50,6 +50,11 @@ final class WorkerOptions {
@Option(names = Array("--is-shutdown"), description = Array("Check if the
system is shutdown"))
private[worker] var isShutdown: Boolean = _
+ @Option(
+ names = Array("--is-decommissioning"),
+ description = Array("Check if the system is decommissioning"))
+ private[worker] var isDecommissioning: Boolean = _
+
@Option(
names = Array("--is-registered"),
description = Array("Check if the system is registered"))
diff --git
a/cli/src/main/scala/org/apache/celeborn/cli/worker/WorkerSubcommand.scala
b/cli/src/main/scala/org/apache/celeborn/cli/worker/WorkerSubcommand.scala
index 7690c364e..517ec802c 100644
--- a/cli/src/main/scala/org/apache/celeborn/cli/worker/WorkerSubcommand.scala
+++ b/cli/src/main/scala/org/apache/celeborn/cli/worker/WorkerSubcommand.scala
@@ -70,6 +70,8 @@ trait WorkerSubcommand extends CliLogging {
private[worker] def runIsShutdown: Boolean
+ private[worker] def runIsDecommissioning: Boolean
+
private[worker] def runIsRegistered: Boolean
private[worker] def runExit: HandleResponse
diff --git
a/cli/src/main/scala/org/apache/celeborn/cli/worker/WorkerSubcommandImpl.scala
b/cli/src/main/scala/org/apache/celeborn/cli/worker/WorkerSubcommandImpl.scala
index eee9e56dd..62289c35f 100644
---
a/cli/src/main/scala/org/apache/celeborn/cli/worker/WorkerSubcommandImpl.scala
+++
b/cli/src/main/scala/org/apache/celeborn/cli/worker/WorkerSubcommandImpl.scala
@@ -33,6 +33,7 @@ class WorkerSubcommandImpl extends Runnable with
WorkerSubcommand {
if (workerOptions.showPartitionLocationInfo)
log(runShowPartitionLocationInfo)
if (workerOptions.showUnavailablePeers) log(runShowUnavailablePeers)
if (workerOptions.isShutdown) log(runIsShutdown)
+ if (workerOptions.isDecommissioning) log(runIsDecommissioning)
if (workerOptions.isRegistered) log(runIsRegistered)
if (workerOptions.exitType != null && workerOptions.exitType.nonEmpty)
log(runExit)
if (workerOptions.showConf) log(runShowConf)
@@ -57,6 +58,8 @@ class WorkerSubcommandImpl extends Runnable with
WorkerSubcommand {
private[worker] def runIsShutdown: Boolean = runShowWorkerInfo.getIsShutdown
+ private[worker] def runIsDecommissioning: Boolean =
runShowWorkerInfo.getIsDecommissioning
+
private[worker] def runIsRegistered: Boolean =
runShowWorkerInfo.getIsRegistered
private[worker] def runExit: HandleResponse = {
diff --git
a/cli/src/test/scala/org/apache/celeborn/cli/TestCelebornCliCommands.scala
b/cli/src/test/scala/org/apache/celeborn/cli/TestCelebornCliCommands.scala
index ea36e0ac8..b0ce832ec 100644
--- a/cli/src/test/scala/org/apache/celeborn/cli/TestCelebornCliCommands.scala
+++ b/cli/src/test/scala/org/apache/celeborn/cli/TestCelebornCliCommands.scala
@@ -104,6 +104,11 @@ class TestCelebornCliCommands extends CelebornFunSuite
with MiniClusterFeature {
captureOutputAndValidateResponse(args, "false")
}
+ test("worker --is-decommissioning") {
+ val args = prepareWorkerArgs() :+ "--is-decommissioning"
+ captureOutputAndValidateResponse(args, "false")
+ }
+
test("worker --is-registered") {
val args = prepareWorkerArgs() :+ "--is-registered"
captureOutputAndValidateResponse(args, "true")
@@ -166,6 +171,11 @@ class TestCelebornCliCommands extends CelebornFunSuite
with MiniClusterFeature {
captureOutputAndValidateResponse(args, "No shutdown workers found.")
}
+ test("master --show-decommissioning-workers") {
+ val args = prepareMasterArgs() :+ "--show-decommissioning-workers"
+ captureOutputAndValidateResponse(args, "No decommissioning workers found.")
+ }
+
test("master --show-lifecycle-managers") {
val args = prepareMasterArgs() :+ "--show-lifecycle-managers"
captureOutputAndValidateResponse(args, "HostnamesResponse")
@@ -214,6 +224,14 @@ class TestCelebornCliCommands extends CelebornFunSuite
with MiniClusterFeature {
captureOutputAndValidateResponse(args, "success: true")
}
+ test("master --remove-workers-unavailable-info") {
+ val args = prepareMasterArgs() ++ Array(
+ "--remove-workers-unavailable-info",
+ "--worker-ids",
+ getWorkerId())
+ captureOutputAndValidateResponse(args, "success: true")
+ }
+
private def prepareMasterArgs(): Array[String] = {
Array(
"master",