This is an automated email from the ASF dual-hosted git repository.
mridulm80 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/celeborn.git
The following commit(s) were added to refs/heads/main by this push:
new cc26131f8 [CELEBORN-1572] Celeborn CLI initial REST API support
cc26131f8 is described below
commit cc26131f88e845e306ff35435e10e506a2c058f3
Author: Aravind Patnam <[email protected]>
AuthorDate: Thu Sep 5 11:15:16 2024 -0500
[CELEBORN-1572] Celeborn CLI initial REST API support
### What changes were proposed in this pull request?
Introducing the Celeborn CLI (based on this
[CPIP](https://cwiki.apache.org/confluence/display/CELEBORN/CIP-7+Celeborn+CLI)).
For the first iteration, adding support for querying the existing REST api
endpoints.
After this will add a layer for external cluster manager support. Further
improvements are needed such as pretty print, which can be added in subsequent
PRs.
### Why are the changes needed?
see
[CPIP](https://cwiki.apache.org/confluence/display/CELEBORN/CIP-7+Celeborn+CLI)
### Does this PR introduce _any_ user-facing change?
yes, new CLI tool.
### How was this patch tested?
added UTs and also tested internally.
Closes #2699 from akpatnam25/cli-CELEBORN-1572.
Lead-authored-by: Aravind Patnam <[email protected]>
Co-authored-by: Aravind Patnam <[email protected]>
Signed-off-by: Mridul Muralidharan <mridul<at>gmail.com>
---
bin/celeborn-class | 11 +
build/make-distribution.sh | 16 +-
build/release/release.sh | 3 +
cli/pom.xml | 83 +++++++
.../org/apache/celeborn/cli/CelebornCli.scala | 47 ++++
.../apache/celeborn/cli/common/CliLogging.scala | 40 ++++
.../celeborn/cli/common/CliVersionProvider.scala | 52 +++++
.../apache/celeborn/cli/common/CommonOptions.scala | 74 ++++++
.../celeborn/cli/config/CliConfigManager.scala | 87 ++++++++
.../apache/celeborn/cli/master/MasterOptions.scala | 94 ++++++++
.../celeborn/cli/master/MasterSubcommand.scala | 105 +++++++++
.../celeborn/cli/master/MasterSubcommandImpl.scala | 187 ++++++++++++++++
.../apache/celeborn/cli/worker/WorkerOptions.scala | 73 ++++++
.../celeborn/cli/worker/WorkerSubcommand.scala | 83 +++++++
.../celeborn/cli/worker/WorkerSubcommandImpl.scala | 78 +++++++
.../celeborn/cli/TestCelebornCliCommands.scala | 247 +++++++++++++++++++++
.../org/apache/celeborn/common/util/Utils.scala | 11 +
pom.xml | 7 +
project/CelebornBuild.scala | 20 +-
sbin/celeborn-cli | 24 ++
20 files changed, 1340 insertions(+), 2 deletions(-)
diff --git a/bin/celeborn-class b/bin/celeborn-class
index 5bb3791ff..32db4fe9d 100755
--- a/bin/celeborn-class
+++ b/bin/celeborn-class
@@ -46,6 +46,9 @@ do
if [ "$i" == "org.apache.celeborn.service.deploy.worker.Worker" ] ; then
LAUNCH_CLASS=org.apache.celeborn.service.deploy.worker.Worker
fi
+ if [ "$i" == "org.apache.celeborn.cli.CelebornCli" ] ; then
+ LAUNCH_CLASS=org.apache.celeborn.cli.CelebornCli
+ fi
done
if [ "${LAUNCH_CLASS}" == "org.apache.celeborn.service.deploy.master.Master" ]
; then
@@ -64,6 +67,14 @@ if [ "${LAUNCH_CLASS}" ==
"org.apache.celeborn.service.deploy.worker.Worker" ] ;
fi
fi
+if [ "${LAUNCH_CLASS}" == "org.apache.celeborn.cli.CelebornCli" ] ; then
+ if [ -d "${CELEBORN_HOME}/cli-jars" ]; then
+ CELEBORN_JARS_DIR="${CELEBORN_HOME}/cli-jars"
+ else
+ CELEBORN_JARS_DIR="${CELEBORN_HOME}/cli/target"
+ fi
+fi
+
if [ ! -d "$CELEBORN_JARS_DIR" ]; then
echo "Failed to find CELEBORN jars directory ($CELEBORN_JARS_DIR)." 1>&2
echo "You need to build CELEBORN with the target \"package\" before running
this program." 1>&2
diff --git a/build/make-distribution.sh b/build/make-distribution.sh
index 2f0c9aa29..3133c2f3c 100755
--- a/build/make-distribution.sh
+++ b/build/make-distribution.sh
@@ -138,7 +138,7 @@ function build_service {
# Store the command as an array because $MVN variable might have spaces in
it.
# Normal quoting tricks don't work.
# See: http://mywiki.wooledge.org/BashFAQ/050
- BUILD_COMMAND=("$MVN" clean package $MVN_DIST_OPT -pl master,worker -am $@)
+ BUILD_COMMAND=("$MVN" clean package $MVN_DIST_OPT -pl master,worker,cli -am
$@)
# Actually build the jar
echo -e "\nBuilding with..."
@@ -149,6 +149,7 @@ function build_service {
mkdir -p "$DIST_DIR/jars"
mkdir -p "$DIST_DIR/master-jars"
mkdir -p "$DIST_DIR/worker-jars"
+ mkdir -p "$DIST_DIR/cli-jars"
## Copy master jars
cp "$PROJECT_DIR"/master/target/celeborn-master_$SCALA_VERSION-$VERSION.jar
"$DIST_DIR/master-jars/"
@@ -162,6 +163,12 @@ function build_service {
for jar in $(ls "$PROJECT_DIR/worker/target/scala-$SCALA_VERSION/jars"); do
(cd $DIST_DIR/worker-jars; ln -snf "../jars/$jar" .)
done
+ ## Copy cli jars
+ cp "$PROJECT_DIR"/cli/target/celeborn-cli_$SCALA_VERSION-$VERSION.jar
"$DIST_DIR/cli-jars/"
+ cp "$PROJECT_DIR"/cli/target/scala-$SCALA_VERSION/jars/*.jar
"$DIST_DIR/jars/"
+ for jar in $(ls "$PROJECT_DIR/cli/target/scala-$SCALA_VERSION/jars"); do
+ (cd $DIST_DIR/cli-jars; ln -snf "../jars/$jar" .)
+ done
}
function build_spark_client {
@@ -273,6 +280,7 @@ function sbt_build_service {
mkdir -p "$DIST_DIR/jars"
mkdir -p "$DIST_DIR/master-jars"
mkdir -p "$DIST_DIR/worker-jars"
+ mkdir -p "$DIST_DIR/cli-jars"
## Copy master jars
cp
"$PROJECT_DIR"/master/target/scala-$SCALA_VERSION/celeborn-master_$SCALA_VERSION-$VERSION.jar
"$DIST_DIR/master-jars/"
@@ -286,6 +294,12 @@ function sbt_build_service {
for jar in $(ls "$PROJECT_DIR/worker/target/scala-$SCALA_VERSION/jars"); do
(cd $DIST_DIR/worker-jars; ln -snf "../jars/$jar" .)
done
+ ## Copy cli jars
+ cp
"$PROJECT_DIR"/cli/target/scala-$SCALA_VERSION/celeborn-cli_$SCALA_VERSION-$VERSION.jar
"$DIST_DIR/cli-jars/"
+ cp "$PROJECT_DIR"/cli/target/scala-$SCALA_VERSION/jars/*.jar
"$DIST_DIR/jars/"
+ for jar in $(ls "$PROJECT_DIR/cli/target/scala-$SCALA_VERSION/jars"); do
+ (cd $DIST_DIR/cli-jars; ln -snf "../jars/$jar" .)
+ done
}
function sbt_build_client {
diff --git a/build/release/release.sh b/build/release/release.sh
index ae381fedd..f12342b57 100755
--- a/build/release/release.sh
+++ b/build/release/release.sh
@@ -133,6 +133,9 @@ upload_nexus_staging() {
echo "Deploying celeborn-spi"
${PROJECT_DIR}/build/sbt "clean;celeborn-spi/publishSigned"
+
+ echo "Deploying celeborn-cli"
+ ${PROJECT_DIR}/build/sbt "clean;celeborn-cli/publishSigned"
}
finalize_svn() {
diff --git a/cli/pom.xml b/cli/pom.xml
new file mode 100644
index 000000000..b8773149a
--- /dev/null
+++ b/cli/pom.xml
@@ -0,0 +1,83 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ ~ Licensed to the Apache Software Foundation (ASF) under one or more
+ ~ contributor license agreements. See the NOTICE file distributed with
+ ~ this work for additional information regarding copyright ownership.
+ ~ The ASF licenses this file to You under the Apache License, Version 2.0
+ ~ (the "License"); you may not use this file except in compliance with
+ ~ the License. You may obtain a copy of the License at
+ ~
+ ~ http://www.apache.org/licenses/LICENSE-2.0
+ ~
+ ~ Unless required by applicable law or agreed to in writing, software
+ ~ distributed under the License is distributed on an "AS IS" BASIS,
+ ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ ~ See the License for the specific language governing permissions and
+ ~ limitations under the License.
+ -->
+<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0
http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <modelVersion>4.0.0</modelVersion>
+
+ <parent>
+ <groupId>org.apache.celeborn</groupId>
+ <artifactId>celeborn-parent_${scala.binary.version}</artifactId>
+ <version>${project.version}</version>
+ <relativePath>../pom.xml</relativePath>
+ </parent>
+
+ <artifactId>celeborn-cli_${scala.binary.version}</artifactId>
+ <packaging>jar</packaging>
+ <name>Celeborn CLI</name>
+
+ <dependencies>
+ <dependency>
+ <groupId>info.picocli</groupId>
+ <artifactId>picocli</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.celeborn</groupId>
+ <artifactId>celeborn-openapi-client_${scala.binary.version}</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.celeborn</groupId>
+ <artifactId>celeborn-worker_${scala.binary.version}</artifactId>
+ <version>${project.version}</version>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.celeborn</groupId>
+ <artifactId>celeborn-worker_${scala.binary.version}</artifactId>
+ <version>${project.version}</version>
+ <type>test-jar</type>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.celeborn</groupId>
+ <artifactId>celeborn-common_${scala.binary.version}</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.celeborn</groupId>
+ <artifactId>celeborn-common_${scala.binary.version}</artifactId>
+ <version>${project.version}</version>
+ <type>test-jar</type>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.celeborn</groupId>
+ <artifactId>celeborn-master_${scala.binary.version}</artifactId>
+ <version>${project.version}</version>
+ <scope>test</scope>
+ </dependency>
+ </dependencies>
+
+ <build>
+ <plugins>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-dependency-plugin</artifactId>
+ </plugin>
+ </plugins>
+ </build>
+</project>
diff --git a/cli/src/main/scala/org/apache/celeborn/cli/CelebornCli.scala
b/cli/src/main/scala/org/apache/celeborn/cli/CelebornCli.scala
new file mode 100644
index 000000000..fb2081d7f
--- /dev/null
+++ b/cli/src/main/scala/org/apache/celeborn/cli/CelebornCli.scala
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.celeborn.cli
+import picocli.CommandLine
+import picocli.CommandLine.Command
+
+import org.apache.celeborn.cli.common.{CliLogging, CliVersionProvider}
+import org.apache.celeborn.cli.master.MasterSubcommandImpl
+import org.apache.celeborn.cli.worker.WorkerSubcommandImpl
+@Command(
+ name = "celeborn-cli",
+ versionProvider = classOf[CliVersionProvider],
+ mixinStandardHelpOptions = true,
+ description = Array("@|bold Scala|@ Celeborn CLI"),
+ subcommands = Array(
+ classOf[MasterSubcommandImpl],
+ classOf[WorkerSubcommandImpl]))
+class CelebornCli extends Runnable with CliLogging {
+ override def run(): Unit = {
+ logError(
+ "Master or Worker subcommand needs to be provided. Please run -h to see
the usage info.")
+ }
+}
+
+object CelebornCli {
+ def main(args: Array[String]): Unit = {
+ val cmd = new CommandLine(new CelebornCli())
+ cmd.setOptionsCaseInsensitive(false)
+ cmd.setSubcommandsCaseInsensitive(false)
+ new CommandLine(new CelebornCli()).execute(args: _*)
+ }
+}
diff --git a/cli/src/main/scala/org/apache/celeborn/cli/common/CliLogging.scala
b/cli/src/main/scala/org/apache/celeborn/cli/common/CliLogging.scala
new file mode 100644
index 000000000..d389a1452
--- /dev/null
+++ b/cli/src/main/scala/org/apache/celeborn/cli/common/CliLogging.scala
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.celeborn.cli.common
+
+import Console.{GREEN, RED, RESET}
+
+trait CliLogging {
+
+ def log(msg: String): Unit = {
+ Console.println(msg)
+ }
+
+ def log(obj: Any): Unit = {
+ Console.println(obj)
+ }
+
+ def logInfo(msg: String): Unit = {
+ Console.println(s"${RESET}${GREEN}${msg}")
+ }
+
+ def logError(msg: String): Unit = {
+ Console.println(s"${RESET}${RED}${msg}")
+ }
+
+}
diff --git
a/cli/src/main/scala/org/apache/celeborn/cli/common/CliVersionProvider.scala
b/cli/src/main/scala/org/apache/celeborn/cli/common/CliVersionProvider.scala
new file mode 100644
index 000000000..2dd48267b
--- /dev/null
+++ b/cli/src/main/scala/org/apache/celeborn/cli/common/CliVersionProvider.scala
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.celeborn.cli.common
+
+import java.nio.file.{Files, Paths}
+
+import scala.io.Source
+
+import picocli.CommandLine.IVersionProvider
+
+import org.apache.celeborn.common.util.Utils
+
+class CliVersionProvider extends IVersionProvider with CliLogging {
+
+ private val versionPattern = """Celeborn\s+\S+""".r
+ private val prefix = "Celeborn CLI"
+
+ override def getVersion: Array[String] = {
+ val versionFile = Paths.get(sys.env.getOrElse("CELEBORN_HOME", "") +
"/RELEASE")
+
+ if (Files.exists(versionFile)) {
+ Utils.tryWithResources(Source.fromFile(versionFile.toFile)) { source =>
+ source.getLines().find(line =>
versionPattern.findFirstIn(line).isDefined) match {
+ case Some(matchingLine) =>
+ Array(s"$prefix - ${versionPattern.findFirstIn(matchingLine).get}")
+ case _ =>
+ logInfo("Could not resolve version of Celeborn since RELEASE file
did not contain version info.")
+ Array(prefix)
+ }
+ }
+ } else {
+ logInfo(
+ "Could not resolve version of Celeborn since no RELEASE file was found
in $CELEBORN_HOME.")
+ Array(prefix)
+ }
+ }
+}
diff --git
a/cli/src/main/scala/org/apache/celeborn/cli/common/CommonOptions.scala
b/cli/src/main/scala/org/apache/celeborn/cli/common/CommonOptions.scala
new file mode 100644
index 000000000..507d5374f
--- /dev/null
+++ b/cli/src/main/scala/org/apache/celeborn/cli/common/CommonOptions.scala
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.celeborn.cli.common
+
+import picocli.CommandLine.{Command, Option, Spec}
+import picocli.CommandLine.Model.CommandSpec
+
+@Command
+class CommonOptions {
+
+ @Spec var spec: CommandSpec = _ // injected by picocli
+
+ @Option(
+ names = Array("--hostport"),
+ paramLabel = "host:port",
+ description = Array("The host and http port"))
+ private[cli] var hostPort: String = _
+
+ @Option(
+ names = Array("--host-list"),
+ paramLabel = "h1,h2,h3...",
+ description = Array("List of hosts to pass to the command"))
+ private[cli] var hostList: String = _
+
+ @Option(
+ names = Array("--worker-ids"),
+ paramLabel = "w1,w2,w3...",
+ description =
+ Array("List of workerIds to pass to the command. Each worker should be
in the format" +
+ " host:rpcPort:pushPort:fetchPort:replicatePort."))
+ private[cli] var workerIds: String = _
+
+ @Option(
+ names = Array("--cluster"),
+ paramLabel = "cluster_alias",
+ description = Array("The alias of the cluster to use to query masters"))
+ private[cli] var cluster: String = _
+
+ // Required for getting dynamic config info
+ @Option(
+ names = Array("--config-level"),
+ paramLabel = "level",
+ description = Array("The config level of the dynamic configs"))
+ private[cli] var configLevel: String = _
+
+ // Required for getting dynamic config info
+ @Option(
+ names = Array("--config-tenant"),
+ paramLabel = "tenant_id",
+ description = Array("The tenant id of TENANT or TENANT_USER level."))
+ private[cli] var configTenant: String = _
+
+ // Required for getting dynamic config info
+ @Option(
+ names = Array("--config-name"),
+ paramLabel = "username",
+ description = Array("The username of the TENANT_USER level."))
+ private[cli] var configName: String = _
+}
diff --git
a/cli/src/main/scala/org/apache/celeborn/cli/config/CliConfigManager.scala
b/cli/src/main/scala/org/apache/celeborn/cli/config/CliConfigManager.scala
new file mode 100644
index 000000000..873a5cbf1
--- /dev/null
+++ b/cli/src/main/scala/org/apache/celeborn/cli/config/CliConfigManager.scala
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.celeborn.cli.config
+
+import java.io.{File, FileInputStream, FileOutputStream}
+import java.time.LocalDateTime
+import java.util.Properties
+
+import org.apache.celeborn.cli.common.CliLogging
+import org.apache.celeborn.common.util.Utils
+
+case class CliConfig(cliConfigData: Map[String, String])
+
+object CliConfigManager {
+ val cliConfigFilePath: String = s"${sys.env("HOME")}/.celeborn-cli.conf"
+}
+
+class CliConfigManager extends CliLogging {
+
+ private val properties = new Properties()
+
+ def loadConfig(): Option[CliConfig] = {
+ val file = new File(CliConfigManager.cliConfigFilePath)
+ if (!file.exists()) {
+ None
+ } else {
+ Utils.tryWithResources(new FileInputStream(file)) { inputStream =>
+ properties.load(inputStream)
+
Some(CliConfig(properties.stringPropertyNames().toArray.map(_.toString).map {
key =>
+ key -> properties.getProperty(key)
+ }.toMap))
+ }
+ }
+ }
+
+ private def saveConfig(cliConfig: CliConfig): Unit = {
+ try {
+ val file = new File(CliConfigManager.cliConfigFilePath)
+ if (!file.exists()) {
+ file.getParentFile.mkdirs()
+ file.createNewFile()
+ }
+ properties.clear()
+ val outputStream = new FileOutputStream(file)
+ Utils.tryWithResources(outputStream) { os =>
+ cliConfig.cliConfigData.foreach { case (key, value) =>
+ properties.setProperty(key, value)
+ }
+ properties.store(os, s"Last updated conf at ${LocalDateTime.now()}")
+ }
+ } catch {
+ case e: Exception =>
+ logError(s"Error saving config: ${e.getMessage}")
+ }
+ }
+
+ def add(key: String, value: String): Unit = {
+ val config = loadConfig().getOrElse(CliConfig(Map()))
+ val updatedConfig = config.copy(cliConfigData = config.cliConfigData +
(key -> value))
+ saveConfig(updatedConfig)
+ }
+
+ def remove(key: String): Unit = {
+ val config = loadConfig().getOrElse(CliConfig(Map()))
+ val updatedConfig = config.copy(cliConfigData = config.cliConfigData - key)
+ saveConfig(updatedConfig)
+ }
+
+ def get(key: String): Option[String] = {
+ loadConfig().flatMap(config => config.cliConfigData.get(key))
+ }
+}
diff --git
a/cli/src/main/scala/org/apache/celeborn/cli/master/MasterOptions.scala
b/cli/src/main/scala/org/apache/celeborn/cli/master/MasterOptions.scala
new file mode 100644
index 000000000..b22f0bb3f
--- /dev/null
+++ b/cli/src/main/scala/org/apache/celeborn/cli/master/MasterOptions.scala
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.celeborn.cli.master
+
+import picocli.CommandLine.Option
+
+final class MasterOptions {
+
+ @Option(names = Array("--show-masters-info"), description = Array("Show
master group info"))
+ private[master] var showMastersInfo: Boolean = _
+
+ @Option(names = Array("--show-cluster-apps"), description = Array("Show
cluster applications"))
+ private[master] var showClusterApps: Boolean = _
+
+ @Option(names = Array("--show-cluster-shuffles"), description = Array("Show
cluster shuffles"))
+ private[master] var showClusterShuffles: Boolean = _
+
+ @Option(
+ names = Array("--show-top-disk-used-apps"),
+ description = Array("Show top disk used apps"))
+ private[master] var showTopDiskUsedApps: Boolean = _
+
+ @Option(names = Array("--exclude-worker"), description = Array("Exclude
workers by ID"))
+ private[master] var excludeWorkers: Boolean = _
+
+ @Option(
+ names = Array("--remove-excluded-worker"),
+ description = Array("Remove excluded workers by ID"))
+ private[master] var removeExcludedWorkers: Boolean = _
+
+ @Option(
+ names = Array("--send-worker-event"),
+ paramLabel = "IMMEDIATELY | DECOMMISSION | DECOMMISSION_THEN_IDLE |
GRACEFUL",
+ description = Array("Send an event to a worker"))
+ private[master] var sendWorkerEvent: String = _
+
+ @Option(
+ names = Array("--show-worker-event-info"),
+ description = Array("Show worker event information"))
+ private[master] var showWorkerEventInfo: Boolean = _
+
+ @Option(names = Array("--show-lost-workers"), description = Array("Show lost
workers"))
+ private[master] var showLostWorkers: Boolean = _
+
+ @Option(names = Array("--show-excluded-workers"), description = Array("Show
excluded workers"))
+ private[master] var showExcludedWorkers: Boolean = _
+
+ @Option(names = Array("--show-shutdown-workers"), description = Array("Show
shutdown workers"))
+ private[master] var showShutdownWorkers: Boolean = _
+
+ @Option(
+ names = Array("--show-lifecycle-managers"),
+ description = Array("Show lifecycle managers"))
+ private[master] var showLifecycleManagers: Boolean = _
+
+ @Option(names = Array("--show-workers"), description = Array("Show
registered workers"))
+ private[master] var showWorkers: Boolean = _
+
+ @Option(names = Array("--show-conf"), description = Array("Show master
conf"))
+ private[master] var showConf: Boolean = _
+
+ @Option(names = Array("--show-dynamic-conf"), description = Array("Show
dynamic master conf"))
+ private[master] var showDynamicConf: Boolean = _
+
+ @Option(names = Array("--show-thread-dump"), description = Array("Show
master thread dump"))
+ private[master] var showThreadDump: Boolean = _
+
+ @Option(
+ names = Array("--add-cluster-alias"),
+ paramLabel = "alias",
+ description = Array("Add alias to use in the cli for the given set of
masters"))
+ private[master] var addClusterAlias: String = _
+
+ @Option(
+ names = Array("--remove-cluster-alias"),
+ paramLabel = "alias",
+ description = Array("Remove alias to use in the cli for the given set of
masters"))
+ private[master] var removeClusterAlias: String = _
+}
diff --git
a/cli/src/main/scala/org/apache/celeborn/cli/master/MasterSubcommand.scala
b/cli/src/main/scala/org/apache/celeborn/cli/master/MasterSubcommand.scala
new file mode 100644
index 000000000..756ebb89d
--- /dev/null
+++ b/cli/src/main/scala/org/apache/celeborn/cli/master/MasterSubcommand.scala
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.celeborn.cli.master
+
+import java.util
+
+import picocli.CommandLine.{ArgGroup, Mixin, ParameterException,
ParentCommand, Spec}
+import picocli.CommandLine.Model.CommandSpec
+
+import org.apache.celeborn.cli.CelebornCli
+import org.apache.celeborn.cli.common.{CliLogging, CommonOptions}
+import org.apache.celeborn.cli.config.CliConfigManager
+import org.apache.celeborn.rest.v1.master.{ApplicationApi, ConfApi,
DefaultApi, MasterApi, ShuffleApi, WorkerApi}
+import org.apache.celeborn.rest.v1.master.invoker.ApiClient
+import org.apache.celeborn.rest.v1.model._
+
+trait MasterSubcommand extends CliLogging {
+
+ @ParentCommand
+ private var celebornCli: CelebornCli = _
+
+ @ArgGroup(exclusive = true, multiplicity = "1")
+ private[master] var masterOptions: MasterOptions = _
+
+ @Mixin
+ private[master] var commonOptions: CommonOptions = _
+
+ @Spec
+ private[master] var spec: CommandSpec = _
+
+ private[master] val cliConfigManager = new CliConfigManager
+ private def apiClient: ApiClient = {
+ val connectionUrl =
+ if (commonOptions.hostPort != null && commonOptions.hostPort.nonEmpty) {
+ commonOptions.hostPort
+ } else {
+ val endpoints = cliConfigManager.get(commonOptions.cluster)
+ endpoints.getOrElse("").split(",")(0)
+ }
+ if (connectionUrl != null && connectionUrl.nonEmpty) {
+ log(s"Using connectionUrl: $connectionUrl")
+ new ApiClient().setBasePath(s"http://$connectionUrl")
+ } else {
+ throw new ParameterException(
+ spec.commandLine(),
+ "No valid connection url found, please provide either --hostport or "
+ "valid cluster alias.")
+ }
+ }
+ private[master] def applicationApi: ApplicationApi = new
ApplicationApi(apiClient)
+ private[master] def confApi: ConfApi = new ConfApi(apiClient)
+ private[master] def defaultApi: DefaultApi = new DefaultApi(apiClient)
+ private[master] def masterApi: MasterApi = new MasterApi(apiClient)
+ private[master] def shuffleApi: ShuffleApi = new ShuffleApi(apiClient)
+ private[master] def workerApi: WorkerApi = new WorkerApi(apiClient)
+
+ private[master] def runShowMastersInfo: MasterInfoResponse
+
+ private[master] def runShowClusterApps: ApplicationsHeartbeatResponse
+
+ private[master] def runShowClusterShuffles: ShufflesResponse
+
+ private[master] def runShowTopDiskUsedApps: AppDiskUsageSnapshotsResponse
+
+ private[master] def runExcludeWorkers: HandleResponse
+
+ private[master] def runRemoveExcludedWorkers: HandleResponse
+
+ private[master] def runSendWorkerEvent: HandleResponse
+
+ private[master] def runShowWorkerEventInfo: WorkerEventsResponse
+
+ private[master] def runShowLostWorkers: Seq[WorkerTimestampData]
+
+ private[master] def runShowExcludedWorkers: Seq[WorkerData]
+
+ private[master] def runShowShutdownWorkers: Seq[WorkerData]
+
+ private[master] def runShowLifecycleManagers: HostnamesResponse
+
+ private[master] def runShowWorkers: WorkersResponse
+
+ private[master] def getWorkerIds: util.List[WorkerId]
+
+ private[master] def runShowConf: ConfResponse
+
+ private[master] def runShowDynamicConf: DynamicConfigResponse
+
+ private[master] def runShowThreadDump: ThreadStackResponse
+
+}
diff --git
a/cli/src/main/scala/org/apache/celeborn/cli/master/MasterSubcommandImpl.scala
b/cli/src/main/scala/org/apache/celeborn/cli/master/MasterSubcommandImpl.scala
new file mode 100644
index 000000000..50a643ca8
--- /dev/null
+++
b/cli/src/main/scala/org/apache/celeborn/cli/master/MasterSubcommandImpl.scala
@@ -0,0 +1,187 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.celeborn.cli.master
+
+import java.util
+
+import scala.collection.JavaConverters._
+
+import picocli.CommandLine.{Command, ParameterException}
+
+import org.apache.celeborn.cli.config.CliConfigManager
+import org.apache.celeborn.rest.v1.model._
+import org.apache.celeborn.rest.v1.model.SendWorkerEventRequest.EventTypeEnum
+
+@Command(name = "master", mixinStandardHelpOptions = true)
+class MasterSubcommandImpl extends Runnable with MasterSubcommand {
+ override def run(): Unit = {
+ if (masterOptions.showMastersInfo) log(runShowMastersInfo)
+ if (masterOptions.showClusterApps) log(runShowClusterApps)
+ if (masterOptions.showClusterShuffles) log(runShowClusterShuffles)
+ if (masterOptions.showTopDiskUsedApps) log(runShowTopDiskUsedApps)
+ if (masterOptions.excludeWorkers) log(runExcludeWorkers)
+ if (masterOptions.removeExcludedWorkers) log(runRemoveExcludedWorkers)
+ if (masterOptions.sendWorkerEvent != null &&
masterOptions.sendWorkerEvent.nonEmpty)
+ log(runSendWorkerEvent)
+ if (masterOptions.showWorkerEventInfo) log(runShowWorkerEventInfo)
+ if (masterOptions.showLostWorkers) log(runShowLostWorkers)
+ if (masterOptions.showExcludedWorkers) log(runShowExcludedWorkers)
+ if (masterOptions.showShutdownWorkers) log(runShowShutdownWorkers)
+ if (masterOptions.showLifecycleManagers) log(runShowLifecycleManagers)
+ if (masterOptions.showWorkers) log(runShowWorkers)
+ if (masterOptions.showConf) log(runShowConf)
+ if (masterOptions.showDynamicConf) log(runShowDynamicConf)
+ if (masterOptions.showThreadDump) log(runShowThreadDump)
+ if (masterOptions.addClusterAlias != null &&
masterOptions.addClusterAlias.nonEmpty)
+ runAddClusterAlias
+ if (masterOptions.removeClusterAlias != null &&
masterOptions.removeClusterAlias.nonEmpty)
+ runRemoveClusterAlias
+ }
+
+ private[master] def runShowMastersInfo: MasterInfoResponse =
masterApi.getMasterGroupInfo
+
+ private[master] def runShowClusterApps: ApplicationsHeartbeatResponse =
+ applicationApi.getApplications
+
+ private[master] def runShowClusterShuffles: ShufflesResponse =
shuffleApi.getShuffles
+
+ private[master] def runShowTopDiskUsedApps: AppDiskUsageSnapshotsResponse =
+ applicationApi.getApplicationsDiskUsageSnapshots
+
+ private[master] def runExcludeWorkers: HandleResponse = {
+ val workerIds = getWorkerIds
+ val excludeWorkerRequest = new ExcludeWorkerRequest().add(workerIds)
+ logInfo(s"Sending exclude worker requests to workers: $workerIds")
+ workerApi.excludeWorker(excludeWorkerRequest)
+ }
+
+ private[master] def runRemoveExcludedWorkers: HandleResponse = {
+ val workerIds = getWorkerIds
+ val removeExcludeWorkerRequest = new
ExcludeWorkerRequest().remove(workerIds)
+ logInfo(s"Sending remove exclude worker requests to workers: $workerIds")
+ workerApi.excludeWorker(removeExcludeWorkerRequest)
+ }
+
+ private[master] def runSendWorkerEvent: HandleResponse = {
+ val eventType = {
+ try {
+ EventTypeEnum.valueOf(masterOptions.sendWorkerEvent.toUpperCase)
+ } catch {
+ case _: IllegalArgumentException => throw new ParameterException(
+ spec.commandLine(),
+ "Worker event type must be " +
+ "IMMEDIATELY, DECOMMISSION, DECOMMISSION_THEN_IDLE, or GRACEFUL")
+ }
+ }
+ val workerIds = getWorkerIds
+ val sendWorkerEventRequest =
+ new SendWorkerEventRequest().workers(workerIds).eventType(eventType)
+ logInfo(s"Sending workerEvent $eventType to workers: $workerIds")
+ workerApi.sendWorkerEvent(sendWorkerEventRequest)
+ }
+
+ private[master] def runShowWorkerEventInfo: WorkerEventsResponse =
workerApi.getWorkerEvents
+
+ private[master] def runShowLostWorkers: Seq[WorkerTimestampData] = {
+ val lostWorkers = runShowWorkers.getLostWorkers.asScala.toSeq
+ if (lostWorkers.isEmpty) {
+ log("No lost workers found.")
+ Seq.empty[WorkerTimestampData]
+ } else {
+ lostWorkers.sortBy(_.getWorker.getHost)
+ }
+ }
+
+ private[master] def runShowExcludedWorkers: Seq[WorkerData] = {
+ val excludedWorkers = runShowWorkers.getExcludedWorkers.asScala.toSeq
+ if (excludedWorkers.isEmpty) {
+ log("No excluded workers found.")
+ Seq.empty[WorkerData]
+ } else {
+ excludedWorkers.sortBy(_.getHost)
+ }
+ }
+
+ private[master] def runShowShutdownWorkers: Seq[WorkerData] = {
+ val shutdownWorkers = runShowWorkers.getShutdownWorkers.asScala.toSeq
+ if (shutdownWorkers.isEmpty) {
+ log("No shutdown workers found.")
+ Seq.empty[WorkerData]
+ } else {
+ shutdownWorkers.sortBy(_.getHost)
+ }
+ }
+
+ private[master] def runShowLifecycleManagers: HostnamesResponse =
+ applicationApi.getApplicationHostNames
+
+ private[master] def runShowWorkers: WorkersResponse = workerApi.getWorkers
+
+ private[master] def getWorkerIds: util.List[WorkerId] = {
+ val workerIds = commonOptions.workerIds
+ if (workerIds == null || workerIds.isEmpty) {
+ throw new ParameterException(
+ spec.commandLine(),
+ "Host list must be provided for this command.")
+ }
+ workerIds
+ .trim
+ .split(",")
+ .map(workerId => {
+ val splitWorkerId = workerId.split(":")
+ val host = splitWorkerId(0)
+ val rpcPort = splitWorkerId(1).toInt
+ val pushPort = splitWorkerId(2).toInt
+ val fetchPort = splitWorkerId(3).toInt
+ val replicatePort = splitWorkerId(4).toInt
+ new
WorkerId().host(host).rpcPort(rpcPort).pushPort(pushPort).fetchPort(
+ fetchPort).replicatePort(replicatePort)
+ })
+ .toList
+ .asJava
+ }
+
+ private[master] def runShowConf: ConfResponse = confApi.getConf
+
+ private[master] def runShowDynamicConf: DynamicConfigResponse =
+ confApi.getDynamicConf(
+ commonOptions.configLevel,
+ commonOptions.configTenant,
+ commonOptions.configName)
+
+ private[master] def runShowThreadDump: ThreadStackResponse =
defaultApi.getThreadDump
+
+ private[master] def runAddClusterAlias: Unit = {
+ val aliasToAdd = masterOptions.addClusterAlias
+ val hosts = commonOptions.hostList
+ if (hosts == null || hosts.isEmpty) {
+ throw new ParameterException(
+ spec.commandLine(),
+ "Host list must be supplied via --host-list to add to alias.")
+ }
+ cliConfigManager.add(aliasToAdd, hosts)
+ logInfo(s"Cluster alias $aliasToAdd added to
${CliConfigManager.cliConfigFilePath}. You can now use the --cluster" +
+ s" command with this alias.")
+ }
+
+ private[master] def runRemoveClusterAlias: Unit = {
+ val aliasToRemove = masterOptions.removeClusterAlias
+ cliConfigManager.remove(aliasToRemove)
+ logInfo(s"Cluster alias $aliasToRemove removed.")
+ }
+}
diff --git
a/cli/src/main/scala/org/apache/celeborn/cli/worker/WorkerOptions.scala
b/cli/src/main/scala/org/apache/celeborn/cli/worker/WorkerOptions.scala
new file mode 100644
index 000000000..a0a1c25a7
--- /dev/null
+++ b/cli/src/main/scala/org/apache/celeborn/cli/worker/WorkerOptions.scala
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.celeborn.cli.worker
+
+import picocli.CommandLine.Option
+
+final class WorkerOptions {
+
+ @Option(names = Array("--show-worker-info"), description = Array("Show
worker info"))
+ private[worker] var showWorkerInfo: Boolean = _
+
+ @Option(
+ names = Array("--show-apps-on-worker"),
+ description = Array("Show applications running on the worker"))
+ private[worker] var showAppsOnWorker: Boolean = _
+
+ @Option(
+ names = Array("--show-shuffles-on-worker"),
+ description = Array("Show shuffles running on the worker"))
+ private[worker] var showShufflesOnWorker: Boolean = _
+
+ @Option(
+ names = Array("--show-top-disk-used-apps"),
+ description = Array("Show top disk used applications"))
+ private[worker] var showTopDiskUsedApps: Boolean = _
+
+ @Option(
+ names = Array("--show-partition-location-info"),
+ description = Array("Show partition location information"))
+ private[worker] var showPartitionLocationInfo: Boolean = _
+
+ @Option(names = Array("--show-unavailable-peers"), description = Array("Show
unavailable peers"))
+ private[worker] var showUnavailablePeers: Boolean = _
+
+ @Option(names = Array("--is-shutdown"), description = Array("Check if the
system is shutdown"))
+ private[worker] var isShutdown: Boolean = _
+
+ @Option(
+ names = Array("--is-registered"),
+ description = Array("Check if the system is registered"))
+ private[worker] var isRegistered: Boolean = _
+
+ @Option(
+ names = Array("--exit"),
+ paramLabel = "exit_type",
+ description = Array("Exit the application with a specified type"))
+ private[worker] var exitType: String = _
+
+ @Option(names = Array("--show-conf"), description = Array("Show worker
conf"))
+ private[worker] var showConf: Boolean = _
+
+ @Option(names = Array("--show-dynamic-conf"), description = Array("Show
dynamic worker conf"))
+ private[worker] var showDynamicConf: Boolean = _
+
+ @Option(names = Array("--show-thread-dump"), description = Array("Show
worker thread dump"))
+ private[worker] var showThreadDump: Boolean = _
+
+}
diff --git
a/cli/src/main/scala/org/apache/celeborn/cli/worker/WorkerSubcommand.scala
b/cli/src/main/scala/org/apache/celeborn/cli/worker/WorkerSubcommand.scala
new file mode 100644
index 000000000..7690c364e
--- /dev/null
+++ b/cli/src/main/scala/org/apache/celeborn/cli/worker/WorkerSubcommand.scala
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.celeborn.cli.worker
+
+import picocli.CommandLine.{ArgGroup, Mixin, ParameterException,
ParentCommand, Spec}
+import picocli.CommandLine.Model.CommandSpec
+
+import org.apache.celeborn.cli.CelebornCli
+import org.apache.celeborn.cli.common.{CliLogging, CommonOptions}
+import org.apache.celeborn.rest.v1.model._
+import org.apache.celeborn.rest.v1.worker.{ApplicationApi, ConfApi,
DefaultApi, ShuffleApi, WorkerApi}
+import org.apache.celeborn.rest.v1.worker.invoker.ApiClient
+
+trait WorkerSubcommand extends CliLogging {
+
+ @ParentCommand
+ private var celebornCli: CelebornCli = _
+
+ @ArgGroup(exclusive = true, multiplicity = "1")
+ private[worker] var workerOptions: WorkerOptions = _
+
+ @Mixin
+ private[worker] var commonOptions: CommonOptions = _
+
+ @Spec
+ private[worker] var spec: CommandSpec = _
+
+ private[worker] def apiClient = {
+ if (commonOptions.hostPort != null && commonOptions.hostPort.nonEmpty) {
+ log(s"Using connectionUrl: ${commonOptions.hostPort}")
+ new ApiClient().setBasePath(s"http://${commonOptions.hostPort}")
+ } else {
+ throw new ParameterException(
+ spec.commandLine(),
+ "No valid connection url found, please provide --hostport.")
+ }
+ }
+ private[worker] def applicationApi = new ApplicationApi(apiClient)
+ private[worker] def confApi = new ConfApi(apiClient)
+ private[worker] def defaultApi = new DefaultApi(apiClient)
+ private[worker] def shuffleApi = new ShuffleApi(apiClient)
+ private[worker] def workerApi = new WorkerApi(apiClient)
+
+ private[worker] def runShowWorkerInfo: WorkerInfoResponse
+
+ private[worker] def runShowAppsOnWorker: ApplicationsResponse
+
+ private[worker] def runShowShufflesOnWorker: ShufflesResponse
+
+ private[worker] def runShowTopDiskUsedApps: AppDiskUsagesResponse
+
+ private[worker] def runShowPartitionLocationInfo: ShufflePartitionsResponse
+
+ private[worker] def runShowUnavailablePeers: UnAvailablePeersResponse
+
+ private[worker] def runIsShutdown: Boolean
+
+ private[worker] def runIsRegistered: Boolean
+
+ private[worker] def runExit: HandleResponse
+
+ private[worker] def runShowConf: ConfResponse
+
+ private[worker] def runShowDynamicConf: DynamicConfigResponse
+
+ private[worker] def runShowThreadDump: ThreadStackResponse
+
+}
diff --git
a/cli/src/main/scala/org/apache/celeborn/cli/worker/WorkerSubcommandImpl.scala
b/cli/src/main/scala/org/apache/celeborn/cli/worker/WorkerSubcommandImpl.scala
new file mode 100644
index 000000000..eee9e56dd
--- /dev/null
+++
b/cli/src/main/scala/org/apache/celeborn/cli/worker/WorkerSubcommandImpl.scala
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.celeborn.cli.worker
+
+import picocli.CommandLine.Command
+
+import org.apache.celeborn.rest.v1.model._
+import org.apache.celeborn.rest.v1.model.WorkerExitRequest.TypeEnum
+
+@Command(name = "worker", mixinStandardHelpOptions = true)
+class WorkerSubcommandImpl extends Runnable with WorkerSubcommand {
+
+ override def run(): Unit = {
+ if (workerOptions.showWorkerInfo) log(runShowWorkerInfo)
+ if (workerOptions.showAppsOnWorker) log(runShowAppsOnWorker)
+ if (workerOptions.showShufflesOnWorker) log(runShowShufflesOnWorker)
+ if (workerOptions.showTopDiskUsedApps) log(runShowTopDiskUsedApps)
+ if (workerOptions.showPartitionLocationInfo)
log(runShowPartitionLocationInfo)
+ if (workerOptions.showUnavailablePeers) log(runShowUnavailablePeers)
+ if (workerOptions.isShutdown) log(runIsShutdown)
+ if (workerOptions.isRegistered) log(runIsRegistered)
+ if (workerOptions.exitType != null && workerOptions.exitType.nonEmpty)
log(runExit)
+ if (workerOptions.showConf) log(runShowConf)
+ if (workerOptions.showDynamicConf) log(runShowDynamicConf)
+ if (workerOptions.showThreadDump) log(runShowThreadDump)
+ }
+
+ private[worker] def runShowWorkerInfo: WorkerInfoResponse =
workerApi.getWorkerInfo
+
+ private[worker] def runShowAppsOnWorker: ApplicationsResponse =
applicationApi.getApplicationList
+
+ private[worker] def runShowShufflesOnWorker: ShufflesResponse =
shuffleApi.getShuffles
+
+ private[worker] def runShowTopDiskUsedApps: AppDiskUsagesResponse =
+ applicationApi.getApplicationsDiskUsage
+
+ private[worker] def runShowPartitionLocationInfo: ShufflePartitionsResponse =
+ shuffleApi.getShufflePartitions
+
+ private[worker] def runShowUnavailablePeers: UnAvailablePeersResponse =
+ workerApi.unavailablePeers()
+
+ private[worker] def runIsShutdown: Boolean = runShowWorkerInfo.getIsShutdown
+
+ private[worker] def runIsRegistered: Boolean =
runShowWorkerInfo.getIsRegistered
+
+ private[worker] def runExit: HandleResponse = {
+ val workerExitType: TypeEnum = TypeEnum.valueOf(workerOptions.exitType)
+ val workerExitRequest: WorkerExitRequest = new
WorkerExitRequest().`type`(workerExitType)
+ logInfo(s"Sending worker exit type: ${workerExitType.getValue}")
+ workerApi.workerExit(workerExitRequest)
+ }
+
+ private[worker] def runShowConf: ConfResponse = confApi.getConf
+
+ private[worker] def runShowDynamicConf: DynamicConfigResponse =
+ confApi.getDynamicConf(
+ commonOptions.configLevel,
+ commonOptions.configTenant,
+ commonOptions.configName)
+
+ private[worker] def runShowThreadDump: ThreadStackResponse =
defaultApi.getThreadDump
+}
diff --git
a/cli/src/test/scala/org/apache/celeborn/cli/TestCelebornCliCommands.scala
b/cli/src/test/scala/org/apache/celeborn/cli/TestCelebornCliCommands.scala
new file mode 100644
index 000000000..6e90650ac
--- /dev/null
+++ b/cli/src/test/scala/org/apache/celeborn/cli/TestCelebornCliCommands.scala
@@ -0,0 +1,247 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.celeborn.cli
+
+import java.io.{ByteArrayOutputStream, File, PrintStream}
+import java.nio.file.{Files, Paths}
+
+import org.apache.celeborn.CelebornFunSuite
+import org.apache.celeborn.cli.config.CliConfigManager
+import org.apache.celeborn.common.CelebornConf
+import org.apache.celeborn.service.deploy.MiniClusterFeature
+import org.apache.celeborn.service.deploy.master.Master
+import org.apache.celeborn.service.deploy.worker.Worker
+
+class TestCelebornCliCommands extends CelebornFunSuite with MiniClusterFeature
{
+
+ private val celebornConf = new CelebornConf()
+ protected var master: Master = _
+ protected var worker: Worker = _
+
+ override def beforeAll(): Unit = {
+ logInfo("test initialized, setup celeborn mini cluster")
+ val (m, w) =
+ setupMiniClusterWithRandomPorts(workerConf = celebornConf.getAll.toMap,
workerNum = 1)
+ master = m
+ worker = w.head
+ super.beforeAll()
+ val aliasCommand = Array(
+ "master",
+ "--add-cluster-alias",
+ "unit-test",
+ "--host-list",
+ master.connectionUrl)
+ captureOutputAndValidateResponse(
+ aliasCommand,
+ s"Cluster alias unit-test added to
${CliConfigManager.cliConfigFilePath}")
+ }
+
+ override def afterAll(): Unit = {
+ super.afterAll()
+ logInfo("all test complete, stop celeborn mini cluster")
+ shutdownMiniCluster()
+ val removeAliasCommand = Array(
+ "master",
+ "--remove-cluster-alias",
+ "unit-test")
+ captureOutputAndValidateResponse(removeAliasCommand, s"Cluster alias
unit-test removed.")
+ val cliConfigManager = new CliConfigManager
+ val aliasExists =
cliConfigManager.loadConfig().exists(_.cliConfigData.contains("unit-test"))
+ assert(!aliasExists)
+ if (new File(CliConfigManager.cliConfigFilePath).exists()) {
+ Files.delete(Paths.get(CliConfigManager.cliConfigFilePath))
+ }
+
+ }
+
+ test("worker --show-worker-info") {
+ val args = prepareWorkerArgs() :+ "--show-worker-info"
+ captureOutputAndValidateResponse(args, "WorkerInfoResponse")
+ }
+
+ test("worker --show-apps-on-worker") {
+ val args = prepareWorkerArgs() :+ "--show-apps-on-worker"
+ captureOutputAndValidateResponse(args, "ApplicationsResponse")
+ }
+
+ test("worker --show-shuffles-on-worker") {
+ val args = prepareWorkerArgs() :+ "--show-shuffles-on-worker"
+ captureOutputAndValidateResponse(args, "ShufflesResponse")
+ }
+
+ test("worker --show-top-disk-used-apps") {
+ val args = prepareWorkerArgs() :+ "--show-top-disk-used-apps"
+ captureOutputAndValidateResponse(args, "AppDiskUsagesResponse")
+ }
+
+ test("worker --show-partition-location-info") {
+ val args = prepareWorkerArgs() :+ "--show-partition-location-info"
+ captureOutputAndValidateResponse(args, "ShufflePartitionsResponse")
+ }
+
+ test("worker --show-unavailable-peers") {
+ val args = prepareWorkerArgs() :+ "--show-unavailable-peers"
+ captureOutputAndValidateResponse(args, "UnAvailablePeersResponse")
+ }
+
+ test("worker --is-shutdown") {
+ val args = prepareWorkerArgs() :+ "--is-shutdown"
+ captureOutputAndValidateResponse(args, "false")
+ }
+
+ test("worker --is-registered") {
+ val args = prepareWorkerArgs() :+ "--is-registered"
+ captureOutputAndValidateResponse(args, "true")
+ }
+
+ test("worker --show-conf") {
+ val args = prepareWorkerArgs() :+ "--show-conf"
+ captureOutputAndValidateResponse(args, "ConfResponse")
+ }
+
+ test("worker --show-dynamic-conf") {
+ cancel("This test is temporarily disabled since dynamic conf is not
enabled in unit tests.")
+ val args = prepareWorkerArgs() :+ "--show-dynamic-conf"
+ captureOutputAndValidateResponse(args, "")
+ }
+
+ test("worker --show-thread-dump") {
+ val args = prepareWorkerArgs() :+ "--show-thread-dump"
+ captureOutputAndValidateResponse(args, "ThreadStackResponse")
+ }
+
+ test("master --show-masters-info") {
+ cancel("This test is temporarily disabled since HA is not enabled in the
unit tests.")
+ val args = prepareMasterArgs() :+ "--show-masters-info"
+ captureOutputAndValidateResponse(args, "")
+ }
+
+ test("master --show-cluster-apps") {
+ val args = prepareMasterArgs() :+ "--show-cluster-apps"
+ captureOutputAndValidateResponse(args, "ApplicationsHeartbeatResponse")
+ }
+
+ test("master --show-cluster-shuffles") {
+ val args = prepareMasterArgs() :+ "--show-cluster-shuffles"
+ captureOutputAndValidateResponse(args, "ShufflesResponse")
+ }
+
+ test("master --show-top-disk-used-apps") {
+ val args = prepareMasterArgs() :+ "--show-top-disk-used-apps"
+ captureOutputAndValidateResponse(args, "AppDiskUsageSnapshotsResponse")
+ }
+
+ test("master --show-worker-event-info") {
+ val args = prepareMasterArgs() :+ "--show-worker-event-info"
+ captureOutputAndValidateResponse(args, "WorkerEventsResponse")
+ }
+
+ test("master --show-lost-workers") {
+ val args = prepareMasterArgs() :+ "--show-lost-workers"
+ captureOutputAndValidateResponse(args, "No lost workers found.")
+ }
+
+ test("master --show-excluded-workers") {
+ val args = prepareMasterArgs() :+ "--show-excluded-workers"
+ captureOutputAndValidateResponse(args, "No excluded workers found.")
+ }
+
+ test("master --show-shutdown-workers") {
+ val args = prepareMasterArgs() :+ "--show-shutdown-workers"
+ captureOutputAndValidateResponse(args, "No shutdown workers found.")
+ }
+
+ test("master --show-lifecycle-managers") {
+ val args = prepareMasterArgs() :+ "--show-lifecycle-managers"
+ captureOutputAndValidateResponse(args, "HostnamesResponse")
+ }
+
+ test("master --show-workers") {
+ val args = prepareMasterArgs() :+ "--show-workers"
+ captureOutputAndValidateResponse(args, "WorkersResponse")
+ }
+
+ test("master --show-conf") {
+ val args = prepareMasterArgs() :+ "--show-conf"
+ captureOutputAndValidateResponse(args, "ConfResponse")
+ }
+
+ test("master --show-dynamic-conf") {
+ cancel("This test is temporarily disabled since dynamic conf is not
enabled in unit tests.")
+ val args = prepareMasterArgs() :+ "--show-dynamic-conf"
+ captureOutputAndValidateResponse(args, "")
+ }
+
+ test("master --show-thread-dump") {
+ val args = prepareMasterArgs() :+ "--show-thread-dump"
+ captureOutputAndValidateResponse(args, "ThreadStackResponse")
+ }
+
+ test("master --exclude-worker and --remove-excluded-worker") {
+ val excludeArgs = prepareMasterArgs() ++ Array(
+ "--exclude-worker",
+ "--worker-ids",
+ getWorkerId())
+ captureOutputAndValidateResponse(excludeArgs, "success: true")
+ val removeExcludedArgs = prepareMasterArgs() ++ Array(
+ "--remove-excluded-worker",
+ "--worker-ids",
+ getWorkerId())
+ captureOutputAndValidateResponse(removeExcludedArgs, "success: true")
+ }
+
+ test("master --send-worker-event") {
+ val args = prepareMasterArgs() ++ Array(
+ "--send-worker-event",
+ "DECOMMISSION",
+ "--worker-ids",
+ getWorkerId())
+ captureOutputAndValidateResponse(args, "success: true")
+ }
+
+ private def prepareMasterArgs(): Array[String] = {
+ Array(
+ "master",
+ "--cluster",
+ "unit-test")
+ }
+
+ private def prepareWorkerArgs(): Array[String] = {
+ Array(
+ "worker",
+ "--hostport",
+ worker.connectionUrl)
+ }
+
+ private def captureOutputAndValidateResponse(
+ args: Array[String],
+ stdoutValidationString: String): Unit = {
+ val stdoutStream = new ByteArrayOutputStream()
+ val stdoutPrintStream = new PrintStream(stdoutStream)
+ Console.withOut(stdoutPrintStream) {
+ CelebornCli.main(args)
+ }
+ val stdout = stdoutStream.toString
+ assert(stdout.nonEmpty && stdout.contains(stdoutValidationString))
+ }
+
+ private def getWorkerId(): String = {
+
s"${worker.workerArgs.host}:${worker.rpcEnv.address.port}:${worker.getPushFetchServerPort._1}"
+
+ s":${worker.getPushFetchServerPort._2}:${worker.replicateServer.getPort}"
+ }
+}
diff --git a/common/src/main/scala/org/apache/celeborn/common/util/Utils.scala
b/common/src/main/scala/org/apache/celeborn/common/util/Utils.scala
index 02650dc1b..c452cbc0c 100644
--- a/common/src/main/scala/org/apache/celeborn/common/util/Utils.scala
+++ b/common/src/main/scala/org/apache/celeborn/common/util/Utils.scala
@@ -990,6 +990,17 @@ object Utils extends Logging {
}
}
+ def tryWithResources[R <: Closeable, U](f: => R)(func: R => U): U = {
+ val res = f
+ try {
+ func(f)
+ } finally {
+ if (null != res) {
+ res.close()
+ }
+ }
+ }
+
def toTransportMessage(message: Any): Any = {
message match {
case legacy: Message =>
diff --git a/pom.xml b/pom.xml
index 2e06cf2b2..9f91c1552 100644
--- a/pom.xml
+++ b/pom.xml
@@ -38,6 +38,7 @@
<module>service</module>
<module>master</module>
<module>worker</module>
+ <module>cli</module>
</modules>
<distributionManagement>
@@ -104,6 +105,7 @@
<jackson.version>2.15.3</jackson.version>
<snappy.version>1.1.10.5</snappy.version>
<ap.loader.version>3.0-8</ap.loader.version>
+ <picocli.version>4.7.6</picocli.version>
<!-- Db dependencies -->
<mybatis.version>3.5.15</mybatis.version>
@@ -708,6 +710,11 @@
<version>${bouncycastle.version}</version>
<scope>test</scope>
</dependency>
+ <dependency>
+ <groupId>info.picocli</groupId>
+ <artifactId>picocli</artifactId>
+ <version>${picocli.version}</version>
+ </dependency>
</dependencies>
</dependencyManagement>
diff --git a/project/CelebornBuild.scala b/project/CelebornBuild.scala
index 31816fe2c..ddddcf366 100644
--- a/project/CelebornBuild.scala
+++ b/project/CelebornBuild.scala
@@ -80,6 +80,7 @@ object Dependencies {
val httpClient5Version = "5.3.1"
val httpCore5Version = "5.2.4"
val javaxAnnotationApiVersion = "1.3.2"
+ val picocliVersion = "4.7.6"
// For SSL support
val bouncycastleVersion = "1.77"
@@ -203,6 +204,8 @@ object Dependencies {
// SSL support
val bouncycastleBcprovJdk18on = "org.bouncycastle" % "bcprov-jdk18on" %
bouncycastleVersion % "test"
val bouncycastleBcpkixJdk18on = "org.bouncycastle" % "bcpkix-jdk18on" %
bouncycastleVersion % "test"
+
+ val picocli = "info.picocli" % "picocli" % picocliVersion
}
object CelebornCommonSettings {
@@ -355,7 +358,8 @@ object CelebornBuild extends sbt.internal.BuildDef {
CelebornClient.client,
CelebornService.service,
CelebornWorker.worker,
- CelebornMaster.master) ++ maybeSparkClientModules ++
maybeFlinkClientModules ++ maybeMRClientModules ++ maybeWebModules
+ CelebornMaster.master,
+ CelebornCli.cli) ++ maybeSparkClientModules ++ maybeFlinkClientModules
++ maybeMRClientModules ++ maybeWebModules
}
// ThisBuild / parallelExecution := false
@@ -455,6 +459,20 @@ object Utils {
}
}
+object CelebornCli {
+ lazy val cli = Project("celeborn-cli", file("cli"))
+ .dependsOn(CelebornCommon.common % "test->test;compile->compile")
+ .dependsOn(CelebornMaster.master % "test->test;compile->compile")
+ .dependsOn(CelebornWorker.worker % "test->test;compile->compile")
+ .dependsOn(CelebornOpenApi.openApiClient % "test->test;compile->compile")
+ .settings (
+ commonSettings,
+ libraryDependencies ++= Seq(
+ Dependencies.picocli
+ ) ++ commonUnitTestDependencies
+ )
+}
+
object CelebornSpi {
lazy val spi = Project("celeborn-spi", file("spi"))
.settings(
diff --git a/sbin/celeborn-cli b/sbin/celeborn-cli
new file mode 100755
index 000000000..ed89f22f8
--- /dev/null
+++ b/sbin/celeborn-cli
@@ -0,0 +1,24 @@
+#!/usr/bin/env bash
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+if [ -z "${CELEBORN_HOME}" ]; then
+ export CELEBORN_HOME="$(cd "`dirname "$0"`"/..; pwd)"
+fi
+
+. "${CELEBORN_HOME}/sbin/load-celeborn-env.sh"
+exec "${CELEBORN_HOME}"/bin/celeborn-class org.apache.celeborn.cli.CelebornCli
"$@"