This is an automated email from the ASF dual-hosted git repository.

mridulm80 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/celeborn.git


The following commit(s) were added to refs/heads/main by this push:
     new cc26131f8 [CELEBORN-1572] Celeborn CLI initial REST API support
cc26131f8 is described below

commit cc26131f88e845e306ff35435e10e506a2c058f3
Author: Aravind Patnam <[email protected]>
AuthorDate: Thu Sep 5 11:15:16 2024 -0500

    [CELEBORN-1572] Celeborn CLI initial REST API support
    
    ### What changes were proposed in this pull request?
    Introducing the Celeborn CLI (based on this 
[CPIP](https://cwiki.apache.org/confluence/display/CELEBORN/CIP-7+Celeborn+CLI)).
 For the first iteration, adding support for querying the existing REST api 
endpoints.
    After this will add a layer for external cluster manager support. Further 
improvements are needed such as pretty print, which can be added in subsequent 
PRs.
    
    ### Why are the changes needed?
    see 
[CPIP](https://cwiki.apache.org/confluence/display/CELEBORN/CIP-7+Celeborn+CLI)
    
    ### Does this PR introduce _any_ user-facing change?
    yes, new CLI tool.
    
    ### How was this patch tested?
    added UTs and also tested internally.
    
    Closes #2699 from akpatnam25/cli-CELEBORN-1572.
    
    Lead-authored-by: Aravind Patnam <[email protected]>
    Co-authored-by: Aravind Patnam <[email protected]>
    Signed-off-by: Mridul Muralidharan <mridul<at>gmail.com>
---
 bin/celeborn-class                                 |  11 +
 build/make-distribution.sh                         |  16 +-
 build/release/release.sh                           |   3 +
 cli/pom.xml                                        |  83 +++++++
 .../org/apache/celeborn/cli/CelebornCli.scala      |  47 ++++
 .../apache/celeborn/cli/common/CliLogging.scala    |  40 ++++
 .../celeborn/cli/common/CliVersionProvider.scala   |  52 +++++
 .../apache/celeborn/cli/common/CommonOptions.scala |  74 ++++++
 .../celeborn/cli/config/CliConfigManager.scala     |  87 ++++++++
 .../apache/celeborn/cli/master/MasterOptions.scala |  94 ++++++++
 .../celeborn/cli/master/MasterSubcommand.scala     | 105 +++++++++
 .../celeborn/cli/master/MasterSubcommandImpl.scala | 187 ++++++++++++++++
 .../apache/celeborn/cli/worker/WorkerOptions.scala |  73 ++++++
 .../celeborn/cli/worker/WorkerSubcommand.scala     |  83 +++++++
 .../celeborn/cli/worker/WorkerSubcommandImpl.scala |  78 +++++++
 .../celeborn/cli/TestCelebornCliCommands.scala     | 247 +++++++++++++++++++++
 .../org/apache/celeborn/common/util/Utils.scala    |  11 +
 pom.xml                                            |   7 +
 project/CelebornBuild.scala                        |  20 +-
 sbin/celeborn-cli                                  |  24 ++
 20 files changed, 1340 insertions(+), 2 deletions(-)

diff --git a/bin/celeborn-class b/bin/celeborn-class
index 5bb3791ff..32db4fe9d 100755
--- a/bin/celeborn-class
+++ b/bin/celeborn-class
@@ -46,6 +46,9 @@ do
   if [ "$i" == "org.apache.celeborn.service.deploy.worker.Worker" ] ; then
     LAUNCH_CLASS=org.apache.celeborn.service.deploy.worker.Worker
   fi
+  if [ "$i" == "org.apache.celeborn.cli.CelebornCli" ] ; then
+    LAUNCH_CLASS=org.apache.celeborn.cli.CelebornCli
+  fi
 done
 
 if [ "${LAUNCH_CLASS}" == "org.apache.celeborn.service.deploy.master.Master" ] 
; then
@@ -64,6 +67,14 @@ if [ "${LAUNCH_CLASS}" == 
"org.apache.celeborn.service.deploy.worker.Worker" ] ;
   fi
 fi
 
+if [ "${LAUNCH_CLASS}" == "org.apache.celeborn.cli.CelebornCli" ] ; then
+  if [ -d "${CELEBORN_HOME}/cli-jars" ]; then
+    CELEBORN_JARS_DIR="${CELEBORN_HOME}/cli-jars"
+  else
+    CELEBORN_JARS_DIR="${CELEBORN_HOME}/cli/target"
+  fi
+fi
+
 if [ ! -d "$CELEBORN_JARS_DIR" ]; then
   echo "Failed to find CELEBORN jars directory ($CELEBORN_JARS_DIR)." 1>&2
   echo "You need to build CELEBORN with the target \"package\" before running 
this program." 1>&2
diff --git a/build/make-distribution.sh b/build/make-distribution.sh
index 2f0c9aa29..3133c2f3c 100755
--- a/build/make-distribution.sh
+++ b/build/make-distribution.sh
@@ -138,7 +138,7 @@ function build_service {
   # Store the command as an array because $MVN variable might have spaces in 
it.
   # Normal quoting tricks don't work.
   # See: http://mywiki.wooledge.org/BashFAQ/050
-  BUILD_COMMAND=("$MVN" clean package $MVN_DIST_OPT -pl master,worker -am $@)
+  BUILD_COMMAND=("$MVN" clean package $MVN_DIST_OPT -pl master,worker,cli -am 
$@)
 
   # Actually build the jar
   echo -e "\nBuilding with..."
@@ -149,6 +149,7 @@ function build_service {
   mkdir -p "$DIST_DIR/jars"
   mkdir -p "$DIST_DIR/master-jars"
   mkdir -p "$DIST_DIR/worker-jars"
+  mkdir -p "$DIST_DIR/cli-jars"
 
   ## Copy master jars
   cp "$PROJECT_DIR"/master/target/celeborn-master_$SCALA_VERSION-$VERSION.jar 
"$DIST_DIR/master-jars/"
@@ -162,6 +163,12 @@ function build_service {
   for jar in $(ls "$PROJECT_DIR/worker/target/scala-$SCALA_VERSION/jars"); do
     (cd $DIST_DIR/worker-jars; ln -snf "../jars/$jar" .)
   done
+  ## Copy cli jars
+  cp "$PROJECT_DIR"/cli/target/celeborn-cli_$SCALA_VERSION-$VERSION.jar 
"$DIST_DIR/cli-jars/"
+  cp "$PROJECT_DIR"/cli/target/scala-$SCALA_VERSION/jars/*.jar 
"$DIST_DIR/jars/"
+  for jar in $(ls "$PROJECT_DIR/cli/target/scala-$SCALA_VERSION/jars"); do
+    (cd $DIST_DIR/cli-jars; ln -snf "../jars/$jar" .)
+  done
 }
 
 function build_spark_client {
@@ -273,6 +280,7 @@ function sbt_build_service {
   mkdir -p "$DIST_DIR/jars"
   mkdir -p "$DIST_DIR/master-jars"
   mkdir -p "$DIST_DIR/worker-jars"
+  mkdir -p "$DIST_DIR/cli-jars"
 
   ## Copy master jars
   cp 
"$PROJECT_DIR"/master/target/scala-$SCALA_VERSION/celeborn-master_$SCALA_VERSION-$VERSION.jar
 "$DIST_DIR/master-jars/"
@@ -286,6 +294,12 @@ function sbt_build_service {
   for jar in $(ls "$PROJECT_DIR/worker/target/scala-$SCALA_VERSION/jars"); do
     (cd $DIST_DIR/worker-jars; ln -snf "../jars/$jar" .)
   done
+  ## Copy cli jars
+  cp 
"$PROJECT_DIR"/cli/target/scala-$SCALA_VERSION/celeborn-cli_$SCALA_VERSION-$VERSION.jar
 "$DIST_DIR/cli-jars/"
+  cp "$PROJECT_DIR"/cli/target/scala-$SCALA_VERSION/jars/*.jar 
"$DIST_DIR/jars/"
+  for jar in $(ls "$PROJECT_DIR/cli/target/scala-$SCALA_VERSION/jars"); do
+    (cd $DIST_DIR/cli-jars; ln -snf "../jars/$jar" .)
+  done
 }
 
 function sbt_build_client {
diff --git a/build/release/release.sh b/build/release/release.sh
index ae381fedd..f12342b57 100755
--- a/build/release/release.sh
+++ b/build/release/release.sh
@@ -133,6 +133,9 @@ upload_nexus_staging() {
 
   echo "Deploying celeborn-spi"
   ${PROJECT_DIR}/build/sbt "clean;celeborn-spi/publishSigned"
+
+  echo "Deploying celeborn-cli"
+  ${PROJECT_DIR}/build/sbt "clean;celeborn-cli/publishSigned"
 }
 
 finalize_svn() {
diff --git a/cli/pom.xml b/cli/pom.xml
new file mode 100644
index 000000000..b8773149a
--- /dev/null
+++ b/cli/pom.xml
@@ -0,0 +1,83 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  ~ Licensed to the Apache Software Foundation (ASF) under one or more
+  ~ contributor license agreements.  See the NOTICE file distributed with
+  ~ this work for additional information regarding copyright ownership.
+  ~ The ASF licenses this file to You under the Apache License, Version 2.0
+  ~ (the "License"); you may not use this file except in compliance with
+  ~ the License.  You may obtain a copy of the License at
+  ~
+  ~    http://www.apache.org/licenses/LICENSE-2.0
+  ~
+  ~ Unless required by applicable law or agreed to in writing, software
+  ~ distributed under the License is distributed on an "AS IS" BASIS,
+  ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  ~ See the License for the specific language governing permissions and
+  ~ limitations under the License.
+  -->
+<project xmlns="http://maven.apache.org/POM/4.0.0"; 
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"; 
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd";>
+  <modelVersion>4.0.0</modelVersion>
+
+  <parent>
+    <groupId>org.apache.celeborn</groupId>
+    <artifactId>celeborn-parent_${scala.binary.version}</artifactId>
+    <version>${project.version}</version>
+    <relativePath>../pom.xml</relativePath>
+  </parent>
+
+  <artifactId>celeborn-cli_${scala.binary.version}</artifactId>
+  <packaging>jar</packaging>
+  <name>Celeborn CLI</name>
+
+  <dependencies>
+    <dependency>
+      <groupId>info.picocli</groupId>
+      <artifactId>picocli</artifactId>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.celeborn</groupId>
+      <artifactId>celeborn-openapi-client_${scala.binary.version}</artifactId>
+      <version>${project.version}</version>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.celeborn</groupId>
+      <artifactId>celeborn-worker_${scala.binary.version}</artifactId>
+      <version>${project.version}</version>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.celeborn</groupId>
+      <artifactId>celeborn-worker_${scala.binary.version}</artifactId>
+      <version>${project.version}</version>
+      <type>test-jar</type>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.celeborn</groupId>
+      <artifactId>celeborn-common_${scala.binary.version}</artifactId>
+      <version>${project.version}</version>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.celeborn</groupId>
+      <artifactId>celeborn-common_${scala.binary.version}</artifactId>
+      <version>${project.version}</version>
+      <type>test-jar</type>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.celeborn</groupId>
+      <artifactId>celeborn-master_${scala.binary.version}</artifactId>
+      <version>${project.version}</version>
+      <scope>test</scope>
+    </dependency>
+  </dependencies>
+
+  <build>
+    <plugins>
+      <plugin>
+        <groupId>org.apache.maven.plugins</groupId>
+        <artifactId>maven-dependency-plugin</artifactId>
+      </plugin>
+    </plugins>
+  </build>
+</project>
diff --git a/cli/src/main/scala/org/apache/celeborn/cli/CelebornCli.scala 
b/cli/src/main/scala/org/apache/celeborn/cli/CelebornCli.scala
new file mode 100644
index 000000000..fb2081d7f
--- /dev/null
+++ b/cli/src/main/scala/org/apache/celeborn/cli/CelebornCli.scala
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.celeborn.cli
+import picocli.CommandLine
+import picocli.CommandLine.Command
+
+import org.apache.celeborn.cli.common.{CliLogging, CliVersionProvider}
+import org.apache.celeborn.cli.master.MasterSubcommandImpl
+import org.apache.celeborn.cli.worker.WorkerSubcommandImpl
+@Command(
+  name = "celeborn-cli",
+  versionProvider = classOf[CliVersionProvider],
+  mixinStandardHelpOptions = true,
+  description = Array("@|bold Scala|@ Celeborn CLI"),
+  subcommands = Array(
+    classOf[MasterSubcommandImpl],
+    classOf[WorkerSubcommandImpl]))
+class CelebornCli extends Runnable with CliLogging {
+  override def run(): Unit = {
+    logError(
+      "Master or Worker subcommand needs to be provided. Please run -h to see 
the usage info.")
+  }
+}
+
+object CelebornCli {
+  def main(args: Array[String]): Unit = {
+    val cmd = new CommandLine(new CelebornCli())
+    cmd.setOptionsCaseInsensitive(false)
+    cmd.setSubcommandsCaseInsensitive(false)
+    new CommandLine(new CelebornCli()).execute(args: _*)
+  }
+}
diff --git a/cli/src/main/scala/org/apache/celeborn/cli/common/CliLogging.scala 
b/cli/src/main/scala/org/apache/celeborn/cli/common/CliLogging.scala
new file mode 100644
index 000000000..d389a1452
--- /dev/null
+++ b/cli/src/main/scala/org/apache/celeborn/cli/common/CliLogging.scala
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.celeborn.cli.common
+
+import Console.{GREEN, RED, RESET}
+
+trait CliLogging {
+
+  def log(msg: String): Unit = {
+    Console.println(msg)
+  }
+
+  def log(obj: Any): Unit = {
+    Console.println(obj)
+  }
+
+  def logInfo(msg: String): Unit = {
+    Console.println(s"${RESET}${GREEN}${msg}")
+  }
+
+  def logError(msg: String): Unit = {
+    Console.println(s"${RESET}${RED}${msg}")
+  }
+
+}
diff --git 
a/cli/src/main/scala/org/apache/celeborn/cli/common/CliVersionProvider.scala 
b/cli/src/main/scala/org/apache/celeborn/cli/common/CliVersionProvider.scala
new file mode 100644
index 000000000..2dd48267b
--- /dev/null
+++ b/cli/src/main/scala/org/apache/celeborn/cli/common/CliVersionProvider.scala
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.celeborn.cli.common
+
+import java.nio.file.{Files, Paths}
+
+import scala.io.Source
+
+import picocli.CommandLine.IVersionProvider
+
+import org.apache.celeborn.common.util.Utils
+
+class CliVersionProvider extends IVersionProvider with CliLogging {
+
+  private val versionPattern = """Celeborn\s+\S+""".r
+  private val prefix = "Celeborn CLI"
+
+  override def getVersion: Array[String] = {
+    val versionFile = Paths.get(sys.env.getOrElse("CELEBORN_HOME", "") + 
"/RELEASE")
+
+    if (Files.exists(versionFile)) {
+      Utils.tryWithResources(Source.fromFile(versionFile.toFile)) { source =>
+        source.getLines().find(line => 
versionPattern.findFirstIn(line).isDefined) match {
+          case Some(matchingLine) =>
+            Array(s"$prefix - ${versionPattern.findFirstIn(matchingLine).get}")
+          case _ =>
+            logInfo("Could not resolve version of Celeborn since RELEASE file 
did not contain version info.")
+            Array(prefix)
+        }
+      }
+    } else {
+      logInfo(
+        "Could not resolve version of Celeborn since no RELEASE file was found 
in $CELEBORN_HOME.")
+      Array(prefix)
+    }
+  }
+}
diff --git 
a/cli/src/main/scala/org/apache/celeborn/cli/common/CommonOptions.scala 
b/cli/src/main/scala/org/apache/celeborn/cli/common/CommonOptions.scala
new file mode 100644
index 000000000..507d5374f
--- /dev/null
+++ b/cli/src/main/scala/org/apache/celeborn/cli/common/CommonOptions.scala
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.celeborn.cli.common
+
+import picocli.CommandLine.{Command, Option, Spec}
+import picocli.CommandLine.Model.CommandSpec
+
+@Command
+class CommonOptions {
+
+  @Spec var spec: CommandSpec = _ // injected by picocli
+
+  @Option(
+    names = Array("--hostport"),
+    paramLabel = "host:port",
+    description = Array("The host and http port"))
+  private[cli] var hostPort: String = _
+
+  @Option(
+    names = Array("--host-list"),
+    paramLabel = "h1,h2,h3...",
+    description = Array("List of hosts to pass to the command"))
+  private[cli] var hostList: String = _
+
+  @Option(
+    names = Array("--worker-ids"),
+    paramLabel = "w1,w2,w3...",
+    description =
+      Array("List of workerIds to pass to the command. Each worker should be 
in the format" +
+        " host:rpcPort:pushPort:fetchPort:replicatePort."))
+  private[cli] var workerIds: String = _
+
+  @Option(
+    names = Array("--cluster"),
+    paramLabel = "cluster_alias",
+    description = Array("The alias of the cluster to use to query masters"))
+  private[cli] var cluster: String = _
+
+  // Required for getting dynamic config info
+  @Option(
+    names = Array("--config-level"),
+    paramLabel = "level",
+    description = Array("The config level of the dynamic configs"))
+  private[cli] var configLevel: String = _
+
+  // Required for getting dynamic config info
+  @Option(
+    names = Array("--config-tenant"),
+    paramLabel = "tenant_id",
+    description = Array("The tenant id of TENANT or TENANT_USER level."))
+  private[cli] var configTenant: String = _
+
+  // Required for getting dynamic config info
+  @Option(
+    names = Array("--config-name"),
+    paramLabel = "username",
+    description = Array("The username of the TENANT_USER level."))
+  private[cli] var configName: String = _
+}
diff --git 
a/cli/src/main/scala/org/apache/celeborn/cli/config/CliConfigManager.scala 
b/cli/src/main/scala/org/apache/celeborn/cli/config/CliConfigManager.scala
new file mode 100644
index 000000000..873a5cbf1
--- /dev/null
+++ b/cli/src/main/scala/org/apache/celeborn/cli/config/CliConfigManager.scala
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.celeborn.cli.config
+
+import java.io.{File, FileInputStream, FileOutputStream}
+import java.time.LocalDateTime
+import java.util.Properties
+
+import org.apache.celeborn.cli.common.CliLogging
+import org.apache.celeborn.common.util.Utils
+
+case class CliConfig(cliConfigData: Map[String, String])
+
+object CliConfigManager {
+  val cliConfigFilePath: String = s"${sys.env("HOME")}/.celeborn-cli.conf"
+}
+
+class CliConfigManager extends CliLogging {
+
+  private val properties = new Properties()
+
+  def loadConfig(): Option[CliConfig] = {
+    val file = new File(CliConfigManager.cliConfigFilePath)
+    if (!file.exists()) {
+      None
+    } else {
+      Utils.tryWithResources(new FileInputStream(file)) { inputStream =>
+        properties.load(inputStream)
+        
Some(CliConfig(properties.stringPropertyNames().toArray.map(_.toString).map { 
key =>
+          key -> properties.getProperty(key)
+        }.toMap))
+      }
+    }
+  }
+
+  private def saveConfig(cliConfig: CliConfig): Unit = {
+    try {
+      val file = new File(CliConfigManager.cliConfigFilePath)
+      if (!file.exists()) {
+        file.getParentFile.mkdirs()
+        file.createNewFile()
+      }
+      properties.clear()
+      val outputStream = new FileOutputStream(file)
+      Utils.tryWithResources(outputStream) { os =>
+        cliConfig.cliConfigData.foreach { case (key, value) =>
+          properties.setProperty(key, value)
+        }
+        properties.store(os, s"Last updated conf at ${LocalDateTime.now()}")
+      }
+    } catch {
+      case e: Exception =>
+        logError(s"Error saving config: ${e.getMessage}")
+    }
+  }
+
+  def add(key: String, value: String): Unit = {
+    val config = loadConfig().getOrElse(CliConfig(Map()))
+    val updatedConfig = config.copy(cliConfigData = config.cliConfigData + 
(key -> value))
+    saveConfig(updatedConfig)
+  }
+
+  def remove(key: String): Unit = {
+    val config = loadConfig().getOrElse(CliConfig(Map()))
+    val updatedConfig = config.copy(cliConfigData = config.cliConfigData - key)
+    saveConfig(updatedConfig)
+  }
+
+  def get(key: String): Option[String] = {
+    loadConfig().flatMap(config => config.cliConfigData.get(key))
+  }
+}
diff --git 
a/cli/src/main/scala/org/apache/celeborn/cli/master/MasterOptions.scala 
b/cli/src/main/scala/org/apache/celeborn/cli/master/MasterOptions.scala
new file mode 100644
index 000000000..b22f0bb3f
--- /dev/null
+++ b/cli/src/main/scala/org/apache/celeborn/cli/master/MasterOptions.scala
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.celeborn.cli.master
+
+import picocli.CommandLine.Option
+
+final class MasterOptions {
+
+  @Option(names = Array("--show-masters-info"), description = Array("Show 
master group info"))
+  private[master] var showMastersInfo: Boolean = _
+
+  @Option(names = Array("--show-cluster-apps"), description = Array("Show 
cluster applications"))
+  private[master] var showClusterApps: Boolean = _
+
+  @Option(names = Array("--show-cluster-shuffles"), description = Array("Show 
cluster shuffles"))
+  private[master] var showClusterShuffles: Boolean = _
+
+  @Option(
+    names = Array("--show-top-disk-used-apps"),
+    description = Array("Show top disk used apps"))
+  private[master] var showTopDiskUsedApps: Boolean = _
+
+  @Option(names = Array("--exclude-worker"), description = Array("Exclude 
workers by ID"))
+  private[master] var excludeWorkers: Boolean = _
+
+  @Option(
+    names = Array("--remove-excluded-worker"),
+    description = Array("Remove excluded workers by ID"))
+  private[master] var removeExcludedWorkers: Boolean = _
+
+  @Option(
+    names = Array("--send-worker-event"),
+    paramLabel = "IMMEDIATELY | DECOMMISSION | DECOMMISSION_THEN_IDLE | 
GRACEFUL",
+    description = Array("Send an event to a worker"))
+  private[master] var sendWorkerEvent: String = _
+
+  @Option(
+    names = Array("--show-worker-event-info"),
+    description = Array("Show worker event information"))
+  private[master] var showWorkerEventInfo: Boolean = _
+
+  @Option(names = Array("--show-lost-workers"), description = Array("Show lost 
workers"))
+  private[master] var showLostWorkers: Boolean = _
+
+  @Option(names = Array("--show-excluded-workers"), description = Array("Show 
excluded workers"))
+  private[master] var showExcludedWorkers: Boolean = _
+
+  @Option(names = Array("--show-shutdown-workers"), description = Array("Show 
shutdown workers"))
+  private[master] var showShutdownWorkers: Boolean = _
+
+  @Option(
+    names = Array("--show-lifecycle-managers"),
+    description = Array("Show lifecycle managers"))
+  private[master] var showLifecycleManagers: Boolean = _
+
+  @Option(names = Array("--show-workers"), description = Array("Show 
registered workers"))
+  private[master] var showWorkers: Boolean = _
+
+  @Option(names = Array("--show-conf"), description = Array("Show master 
conf"))
+  private[master] var showConf: Boolean = _
+
+  @Option(names = Array("--show-dynamic-conf"), description = Array("Show 
dynamic master conf"))
+  private[master] var showDynamicConf: Boolean = _
+
+  @Option(names = Array("--show-thread-dump"), description = Array("Show 
master thread dump"))
+  private[master] var showThreadDump: Boolean = _
+
+  @Option(
+    names = Array("--add-cluster-alias"),
+    paramLabel = "alias",
+    description = Array("Add alias to use in the cli for the given set of 
masters"))
+  private[master] var addClusterAlias: String = _
+
+  @Option(
+    names = Array("--remove-cluster-alias"),
+    paramLabel = "alias",
+    description = Array("Remove alias to use in the cli for the given set of 
masters"))
+  private[master] var removeClusterAlias: String = _
+}
diff --git 
a/cli/src/main/scala/org/apache/celeborn/cli/master/MasterSubcommand.scala 
b/cli/src/main/scala/org/apache/celeborn/cli/master/MasterSubcommand.scala
new file mode 100644
index 000000000..756ebb89d
--- /dev/null
+++ b/cli/src/main/scala/org/apache/celeborn/cli/master/MasterSubcommand.scala
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.celeborn.cli.master
+
+import java.util
+
+import picocli.CommandLine.{ArgGroup, Mixin, ParameterException, 
ParentCommand, Spec}
+import picocli.CommandLine.Model.CommandSpec
+
+import org.apache.celeborn.cli.CelebornCli
+import org.apache.celeborn.cli.common.{CliLogging, CommonOptions}
+import org.apache.celeborn.cli.config.CliConfigManager
+import org.apache.celeborn.rest.v1.master.{ApplicationApi, ConfApi, 
DefaultApi, MasterApi, ShuffleApi, WorkerApi}
+import org.apache.celeborn.rest.v1.master.invoker.ApiClient
+import org.apache.celeborn.rest.v1.model._
+
+trait MasterSubcommand extends CliLogging {
+
+  @ParentCommand
+  private var celebornCli: CelebornCli = _
+
+  @ArgGroup(exclusive = true, multiplicity = "1")
+  private[master] var masterOptions: MasterOptions = _
+
+  @Mixin
+  private[master] var commonOptions: CommonOptions = _
+
+  @Spec
+  private[master] var spec: CommandSpec = _
+
+  private[master] val cliConfigManager = new CliConfigManager
+  private def apiClient: ApiClient = {
+    val connectionUrl =
+      if (commonOptions.hostPort != null && commonOptions.hostPort.nonEmpty) {
+        commonOptions.hostPort
+      } else {
+        val endpoints = cliConfigManager.get(commonOptions.cluster)
+        endpoints.getOrElse("").split(",")(0)
+      }
+    if (connectionUrl != null && connectionUrl.nonEmpty) {
+      log(s"Using connectionUrl: $connectionUrl")
+      new ApiClient().setBasePath(s"http://$connectionUrl";)
+    } else {
+      throw new ParameterException(
+        spec.commandLine(),
+        "No valid connection url found, please provide either --hostport or " 
+ "valid cluster alias.")
+    }
+  }
+  private[master] def applicationApi: ApplicationApi = new 
ApplicationApi(apiClient)
+  private[master] def confApi: ConfApi = new ConfApi(apiClient)
+  private[master] def defaultApi: DefaultApi = new DefaultApi(apiClient)
+  private[master] def masterApi: MasterApi = new MasterApi(apiClient)
+  private[master] def shuffleApi: ShuffleApi = new ShuffleApi(apiClient)
+  private[master] def workerApi: WorkerApi = new WorkerApi(apiClient)
+
+  private[master] def runShowMastersInfo: MasterInfoResponse
+
+  private[master] def runShowClusterApps: ApplicationsHeartbeatResponse
+
+  private[master] def runShowClusterShuffles: ShufflesResponse
+
+  private[master] def runShowTopDiskUsedApps: AppDiskUsageSnapshotsResponse
+
+  private[master] def runExcludeWorkers: HandleResponse
+
+  private[master] def runRemoveExcludedWorkers: HandleResponse
+
+  private[master] def runSendWorkerEvent: HandleResponse
+
+  private[master] def runShowWorkerEventInfo: WorkerEventsResponse
+
+  private[master] def runShowLostWorkers: Seq[WorkerTimestampData]
+
+  private[master] def runShowExcludedWorkers: Seq[WorkerData]
+
+  private[master] def runShowShutdownWorkers: Seq[WorkerData]
+
+  private[master] def runShowLifecycleManagers: HostnamesResponse
+
+  private[master] def runShowWorkers: WorkersResponse
+
+  private[master] def getWorkerIds: util.List[WorkerId]
+
+  private[master] def runShowConf: ConfResponse
+
+  private[master] def runShowDynamicConf: DynamicConfigResponse
+
+  private[master] def runShowThreadDump: ThreadStackResponse
+
+}
diff --git 
a/cli/src/main/scala/org/apache/celeborn/cli/master/MasterSubcommandImpl.scala 
b/cli/src/main/scala/org/apache/celeborn/cli/master/MasterSubcommandImpl.scala
new file mode 100644
index 000000000..50a643ca8
--- /dev/null
+++ 
b/cli/src/main/scala/org/apache/celeborn/cli/master/MasterSubcommandImpl.scala
@@ -0,0 +1,187 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.celeborn.cli.master
+
+import java.util
+
+import scala.collection.JavaConverters._
+
+import picocli.CommandLine.{Command, ParameterException}
+
+import org.apache.celeborn.cli.config.CliConfigManager
+import org.apache.celeborn.rest.v1.model._
+import org.apache.celeborn.rest.v1.model.SendWorkerEventRequest.EventTypeEnum
+
+@Command(name = "master", mixinStandardHelpOptions = true)
+class MasterSubcommandImpl extends Runnable with MasterSubcommand {
+  override def run(): Unit = {
+    if (masterOptions.showMastersInfo) log(runShowMastersInfo)
+    if (masterOptions.showClusterApps) log(runShowClusterApps)
+    if (masterOptions.showClusterShuffles) log(runShowClusterShuffles)
+    if (masterOptions.showTopDiskUsedApps) log(runShowTopDiskUsedApps)
+    if (masterOptions.excludeWorkers) log(runExcludeWorkers)
+    if (masterOptions.removeExcludedWorkers) log(runRemoveExcludedWorkers)
+    if (masterOptions.sendWorkerEvent != null && 
masterOptions.sendWorkerEvent.nonEmpty)
+      log(runSendWorkerEvent)
+    if (masterOptions.showWorkerEventInfo) log(runShowWorkerEventInfo)
+    if (masterOptions.showLostWorkers) log(runShowLostWorkers)
+    if (masterOptions.showExcludedWorkers) log(runShowExcludedWorkers)
+    if (masterOptions.showShutdownWorkers) log(runShowShutdownWorkers)
+    if (masterOptions.showLifecycleManagers) log(runShowLifecycleManagers)
+    if (masterOptions.showWorkers) log(runShowWorkers)
+    if (masterOptions.showConf) log(runShowConf)
+    if (masterOptions.showDynamicConf) log(runShowDynamicConf)
+    if (masterOptions.showThreadDump) log(runShowThreadDump)
+    if (masterOptions.addClusterAlias != null && 
masterOptions.addClusterAlias.nonEmpty)
+      runAddClusterAlias
+    if (masterOptions.removeClusterAlias != null && 
masterOptions.removeClusterAlias.nonEmpty)
+      runRemoveClusterAlias
+  }
+
+  private[master] def runShowMastersInfo: MasterInfoResponse = 
masterApi.getMasterGroupInfo
+
+  private[master] def runShowClusterApps: ApplicationsHeartbeatResponse =
+    applicationApi.getApplications
+
+  private[master] def runShowClusterShuffles: ShufflesResponse = 
shuffleApi.getShuffles
+
+  private[master] def runShowTopDiskUsedApps: AppDiskUsageSnapshotsResponse =
+    applicationApi.getApplicationsDiskUsageSnapshots
+
+  private[master] def runExcludeWorkers: HandleResponse = {
+    val workerIds = getWorkerIds
+    val excludeWorkerRequest = new ExcludeWorkerRequest().add(workerIds)
+    logInfo(s"Sending exclude worker requests to workers: $workerIds")
+    workerApi.excludeWorker(excludeWorkerRequest)
+  }
+
+  private[master] def runRemoveExcludedWorkers: HandleResponse = {
+    val workerIds = getWorkerIds
+    val removeExcludeWorkerRequest = new 
ExcludeWorkerRequest().remove(workerIds)
+    logInfo(s"Sending remove exclude worker requests to workers: $workerIds")
+    workerApi.excludeWorker(removeExcludeWorkerRequest)
+  }
+
+  private[master] def runSendWorkerEvent: HandleResponse = {
+    val eventType = {
+      try {
+        EventTypeEnum.valueOf(masterOptions.sendWorkerEvent.toUpperCase)
+      } catch {
+        case _: IllegalArgumentException => throw new ParameterException(
+            spec.commandLine(),
+            "Worker event type must be " +
+              "IMMEDIATELY, DECOMMISSION, DECOMMISSION_THEN_IDLE, or GRACEFUL")
+      }
+    }
+    val workerIds = getWorkerIds
+    val sendWorkerEventRequest =
+      new SendWorkerEventRequest().workers(workerIds).eventType(eventType)
+    logInfo(s"Sending workerEvent $eventType to workers: $workerIds")
+    workerApi.sendWorkerEvent(sendWorkerEventRequest)
+  }
+
+  private[master] def runShowWorkerEventInfo: WorkerEventsResponse = 
workerApi.getWorkerEvents
+
+  private[master] def runShowLostWorkers: Seq[WorkerTimestampData] = {
+    val lostWorkers = runShowWorkers.getLostWorkers.asScala.toSeq
+    if (lostWorkers.isEmpty) {
+      log("No lost workers found.")
+      Seq.empty[WorkerTimestampData]
+    } else {
+      lostWorkers.sortBy(_.getWorker.getHost)
+    }
+  }
+
+  private[master] def runShowExcludedWorkers: Seq[WorkerData] = {
+    val excludedWorkers = runShowWorkers.getExcludedWorkers.asScala.toSeq
+    if (excludedWorkers.isEmpty) {
+      log("No excluded workers found.")
+      Seq.empty[WorkerData]
+    } else {
+      excludedWorkers.sortBy(_.getHost)
+    }
+  }
+
+  private[master] def runShowShutdownWorkers: Seq[WorkerData] = {
+    val shutdownWorkers = runShowWorkers.getShutdownWorkers.asScala.toSeq
+    if (shutdownWorkers.isEmpty) {
+      log("No shutdown workers found.")
+      Seq.empty[WorkerData]
+    } else {
+      shutdownWorkers.sortBy(_.getHost)
+    }
+  }
+
+  private[master] def runShowLifecycleManagers: HostnamesResponse =
+    applicationApi.getApplicationHostNames
+
+  private[master] def runShowWorkers: WorkersResponse = workerApi.getWorkers
+
+  private[master] def getWorkerIds: util.List[WorkerId] = {
+    val workerIds = commonOptions.workerIds
+    if (workerIds == null || workerIds.isEmpty) {
+      throw new ParameterException(
+        spec.commandLine(),
+        "Host list must be provided for this command.")
+    }
+    workerIds
+      .trim
+      .split(",")
+      .map(workerId => {
+        val splitWorkerId = workerId.split(":")
+        val host = splitWorkerId(0)
+        val rpcPort = splitWorkerId(1).toInt
+        val pushPort = splitWorkerId(2).toInt
+        val fetchPort = splitWorkerId(3).toInt
+        val replicatePort = splitWorkerId(4).toInt
+        new 
WorkerId().host(host).rpcPort(rpcPort).pushPort(pushPort).fetchPort(
+          fetchPort).replicatePort(replicatePort)
+      })
+      .toList
+      .asJava
+  }
+
+  private[master] def runShowConf: ConfResponse = confApi.getConf
+
+  private[master] def runShowDynamicConf: DynamicConfigResponse =
+    confApi.getDynamicConf(
+      commonOptions.configLevel,
+      commonOptions.configTenant,
+      commonOptions.configName)
+
+  private[master] def runShowThreadDump: ThreadStackResponse = 
defaultApi.getThreadDump
+
+  private[master] def runAddClusterAlias: Unit = {
+    val aliasToAdd = masterOptions.addClusterAlias
+    val hosts = commonOptions.hostList
+    if (hosts == null || hosts.isEmpty) {
+      throw new ParameterException(
+        spec.commandLine(),
+        "Host list must be supplied via --host-list to add to alias.")
+    }
+    cliConfigManager.add(aliasToAdd, hosts)
+    logInfo(s"Cluster alias $aliasToAdd added to 
${CliConfigManager.cliConfigFilePath}. You can now use the --cluster" +
+      s" command with this alias.")
+  }
+
+  private[master] def runRemoveClusterAlias: Unit = {
+    val aliasToRemove = masterOptions.removeClusterAlias
+    cliConfigManager.remove(aliasToRemove)
+    logInfo(s"Cluster alias $aliasToRemove removed.")
+  }
+}
diff --git 
a/cli/src/main/scala/org/apache/celeborn/cli/worker/WorkerOptions.scala 
b/cli/src/main/scala/org/apache/celeborn/cli/worker/WorkerOptions.scala
new file mode 100644
index 000000000..a0a1c25a7
--- /dev/null
+++ b/cli/src/main/scala/org/apache/celeborn/cli/worker/WorkerOptions.scala
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.celeborn.cli.worker
+
+import picocli.CommandLine.Option
+
+final class WorkerOptions {
+
+  @Option(names = Array("--show-worker-info"), description = Array("Show 
worker info"))
+  private[worker] var showWorkerInfo: Boolean = _
+
+  @Option(
+    names = Array("--show-apps-on-worker"),
+    description = Array("Show applications running on the worker"))
+  private[worker] var showAppsOnWorker: Boolean = _
+
+  @Option(
+    names = Array("--show-shuffles-on-worker"),
+    description = Array("Show shuffles running on the worker"))
+  private[worker] var showShufflesOnWorker: Boolean = _
+
+  @Option(
+    names = Array("--show-top-disk-used-apps"),
+    description = Array("Show top disk used applications"))
+  private[worker] var showTopDiskUsedApps: Boolean = _
+
+  @Option(
+    names = Array("--show-partition-location-info"),
+    description = Array("Show partition location information"))
+  private[worker] var showPartitionLocationInfo: Boolean = _
+
+  @Option(names = Array("--show-unavailable-peers"), description = Array("Show 
unavailable peers"))
+  private[worker] var showUnavailablePeers: Boolean = _
+
+  @Option(names = Array("--is-shutdown"), description = Array("Check if the 
system is shutdown"))
+  private[worker] var isShutdown: Boolean = _
+
+  @Option(
+    names = Array("--is-registered"),
+    description = Array("Check if the system is registered"))
+  private[worker] var isRegistered: Boolean = _
+
+  @Option(
+    names = Array("--exit"),
+    paramLabel = "exit_type",
+    description = Array("Exit the application with a specified type"))
+  private[worker] var exitType: String = _
+
+  @Option(names = Array("--show-conf"), description = Array("Show worker 
conf"))
+  private[worker] var showConf: Boolean = _
+
+  @Option(names = Array("--show-dynamic-conf"), description = Array("Show 
dynamic worker conf"))
+  private[worker] var showDynamicConf: Boolean = _
+
+  @Option(names = Array("--show-thread-dump"), description = Array("Show 
worker thread dump"))
+  private[worker] var showThreadDump: Boolean = _
+
+}
diff --git 
a/cli/src/main/scala/org/apache/celeborn/cli/worker/WorkerSubcommand.scala 
b/cli/src/main/scala/org/apache/celeborn/cli/worker/WorkerSubcommand.scala
new file mode 100644
index 000000000..7690c364e
--- /dev/null
+++ b/cli/src/main/scala/org/apache/celeborn/cli/worker/WorkerSubcommand.scala
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.celeborn.cli.worker
+
+import picocli.CommandLine.{ArgGroup, Mixin, ParameterException, 
ParentCommand, Spec}
+import picocli.CommandLine.Model.CommandSpec
+
+import org.apache.celeborn.cli.CelebornCli
+import org.apache.celeborn.cli.common.{CliLogging, CommonOptions}
+import org.apache.celeborn.rest.v1.model._
+import org.apache.celeborn.rest.v1.worker.{ApplicationApi, ConfApi, 
DefaultApi, ShuffleApi, WorkerApi}
+import org.apache.celeborn.rest.v1.worker.invoker.ApiClient
+
+trait WorkerSubcommand extends CliLogging {
+
+  @ParentCommand
+  private var celebornCli: CelebornCli = _
+
+  @ArgGroup(exclusive = true, multiplicity = "1")
+  private[worker] var workerOptions: WorkerOptions = _
+
+  @Mixin
+  private[worker] var commonOptions: CommonOptions = _
+
+  @Spec
+  private[worker] var spec: CommandSpec = _
+
+  private[worker] def apiClient = {
+    if (commonOptions.hostPort != null && commonOptions.hostPort.nonEmpty) {
+      log(s"Using connectionUrl: ${commonOptions.hostPort}")
+      new ApiClient().setBasePath(s"http://${commonOptions.hostPort}";)
+    } else {
+      throw new ParameterException(
+        spec.commandLine(),
+        "No valid connection url found, please provide --hostport.")
+    }
+  }
+  private[worker] def applicationApi = new ApplicationApi(apiClient)
+  private[worker] def confApi = new ConfApi(apiClient)
+  private[worker] def defaultApi = new DefaultApi(apiClient)
+  private[worker] def shuffleApi = new ShuffleApi(apiClient)
+  private[worker] def workerApi = new WorkerApi(apiClient)
+
+  private[worker] def runShowWorkerInfo: WorkerInfoResponse
+
+  private[worker] def runShowAppsOnWorker: ApplicationsResponse
+
+  private[worker] def runShowShufflesOnWorker: ShufflesResponse
+
+  private[worker] def runShowTopDiskUsedApps: AppDiskUsagesResponse
+
+  private[worker] def runShowPartitionLocationInfo: ShufflePartitionsResponse
+
+  private[worker] def runShowUnavailablePeers: UnAvailablePeersResponse
+
+  private[worker] def runIsShutdown: Boolean
+
+  private[worker] def runIsRegistered: Boolean
+
+  private[worker] def runExit: HandleResponse
+
+  private[worker] def runShowConf: ConfResponse
+
+  private[worker] def runShowDynamicConf: DynamicConfigResponse
+
+  private[worker] def runShowThreadDump: ThreadStackResponse
+
+}
diff --git 
a/cli/src/main/scala/org/apache/celeborn/cli/worker/WorkerSubcommandImpl.scala 
b/cli/src/main/scala/org/apache/celeborn/cli/worker/WorkerSubcommandImpl.scala
new file mode 100644
index 000000000..eee9e56dd
--- /dev/null
+++ 
b/cli/src/main/scala/org/apache/celeborn/cli/worker/WorkerSubcommandImpl.scala
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.celeborn.cli.worker
+
+import picocli.CommandLine.Command
+
+import org.apache.celeborn.rest.v1.model._
+import org.apache.celeborn.rest.v1.model.WorkerExitRequest.TypeEnum
+
+@Command(name = "worker", mixinStandardHelpOptions = true)
+class WorkerSubcommandImpl extends Runnable with WorkerSubcommand {
+
+  override def run(): Unit = {
+    if (workerOptions.showWorkerInfo) log(runShowWorkerInfo)
+    if (workerOptions.showAppsOnWorker) log(runShowAppsOnWorker)
+    if (workerOptions.showShufflesOnWorker) log(runShowShufflesOnWorker)
+    if (workerOptions.showTopDiskUsedApps) log(runShowTopDiskUsedApps)
+    if (workerOptions.showPartitionLocationInfo) 
log(runShowPartitionLocationInfo)
+    if (workerOptions.showUnavailablePeers) log(runShowUnavailablePeers)
+    if (workerOptions.isShutdown) log(runIsShutdown)
+    if (workerOptions.isRegistered) log(runIsRegistered)
+    if (workerOptions.exitType != null && workerOptions.exitType.nonEmpty) 
log(runExit)
+    if (workerOptions.showConf) log(runShowConf)
+    if (workerOptions.showDynamicConf) log(runShowDynamicConf)
+    if (workerOptions.showThreadDump) log(runShowThreadDump)
+  }
+
+  private[worker] def runShowWorkerInfo: WorkerInfoResponse = 
workerApi.getWorkerInfo
+
+  private[worker] def runShowAppsOnWorker: ApplicationsResponse = 
applicationApi.getApplicationList
+
+  private[worker] def runShowShufflesOnWorker: ShufflesResponse = 
shuffleApi.getShuffles
+
+  private[worker] def runShowTopDiskUsedApps: AppDiskUsagesResponse =
+    applicationApi.getApplicationsDiskUsage
+
+  private[worker] def runShowPartitionLocationInfo: ShufflePartitionsResponse =
+    shuffleApi.getShufflePartitions
+
+  private[worker] def runShowUnavailablePeers: UnAvailablePeersResponse =
+    workerApi.unavailablePeers()
+
+  private[worker] def runIsShutdown: Boolean = runShowWorkerInfo.getIsShutdown
+
+  private[worker] def runIsRegistered: Boolean = 
runShowWorkerInfo.getIsRegistered
+
+  private[worker] def runExit: HandleResponse = {
+    val workerExitType: TypeEnum = TypeEnum.valueOf(workerOptions.exitType)
+    val workerExitRequest: WorkerExitRequest = new 
WorkerExitRequest().`type`(workerExitType)
+    logInfo(s"Sending worker exit type: ${workerExitType.getValue}")
+    workerApi.workerExit(workerExitRequest)
+  }
+
+  private[worker] def runShowConf: ConfResponse = confApi.getConf
+
+  private[worker] def runShowDynamicConf: DynamicConfigResponse =
+    confApi.getDynamicConf(
+      commonOptions.configLevel,
+      commonOptions.configTenant,
+      commonOptions.configName)
+
+  private[worker] def runShowThreadDump: ThreadStackResponse = 
defaultApi.getThreadDump
+}
diff --git 
a/cli/src/test/scala/org/apache/celeborn/cli/TestCelebornCliCommands.scala 
b/cli/src/test/scala/org/apache/celeborn/cli/TestCelebornCliCommands.scala
new file mode 100644
index 000000000..6e90650ac
--- /dev/null
+++ b/cli/src/test/scala/org/apache/celeborn/cli/TestCelebornCliCommands.scala
@@ -0,0 +1,247 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.celeborn.cli
+
+import java.io.{ByteArrayOutputStream, File, PrintStream}
+import java.nio.file.{Files, Paths}
+
+import org.apache.celeborn.CelebornFunSuite
+import org.apache.celeborn.cli.config.CliConfigManager
+import org.apache.celeborn.common.CelebornConf
+import org.apache.celeborn.service.deploy.MiniClusterFeature
+import org.apache.celeborn.service.deploy.master.Master
+import org.apache.celeborn.service.deploy.worker.Worker
+
+class TestCelebornCliCommands extends CelebornFunSuite with MiniClusterFeature 
{
+
+  private val celebornConf = new CelebornConf()
+  protected var master: Master = _
+  protected var worker: Worker = _
+
+  override def beforeAll(): Unit = {
+    logInfo("test initialized, setup celeborn mini cluster")
+    val (m, w) =
+      setupMiniClusterWithRandomPorts(workerConf = celebornConf.getAll.toMap, 
workerNum = 1)
+    master = m
+    worker = w.head
+    super.beforeAll()
+    val aliasCommand = Array(
+      "master",
+      "--add-cluster-alias",
+      "unit-test",
+      "--host-list",
+      master.connectionUrl)
+    captureOutputAndValidateResponse(
+      aliasCommand,
+      s"Cluster alias unit-test added to 
${CliConfigManager.cliConfigFilePath}")
+  }
+
+  override def afterAll(): Unit = {
+    super.afterAll()
+    logInfo("all test complete, stop celeborn mini cluster")
+    shutdownMiniCluster()
+    val removeAliasCommand = Array(
+      "master",
+      "--remove-cluster-alias",
+      "unit-test")
+    captureOutputAndValidateResponse(removeAliasCommand, s"Cluster alias 
unit-test removed.")
+    val cliConfigManager = new CliConfigManager
+    val aliasExists = 
cliConfigManager.loadConfig().exists(_.cliConfigData.contains("unit-test"))
+    assert(!aliasExists)
+    if (new File(CliConfigManager.cliConfigFilePath).exists()) {
+      Files.delete(Paths.get(CliConfigManager.cliConfigFilePath))
+    }
+
+  }
+
+  test("worker --show-worker-info") {
+    val args = prepareWorkerArgs() :+ "--show-worker-info"
+    captureOutputAndValidateResponse(args, "WorkerInfoResponse")
+  }
+
+  test("worker --show-apps-on-worker") {
+    val args = prepareWorkerArgs() :+ "--show-apps-on-worker"
+    captureOutputAndValidateResponse(args, "ApplicationsResponse")
+  }
+
+  test("worker --show-shuffles-on-worker") {
+    val args = prepareWorkerArgs() :+ "--show-shuffles-on-worker"
+    captureOutputAndValidateResponse(args, "ShufflesResponse")
+  }
+
+  test("worker --show-top-disk-used-apps") {
+    val args = prepareWorkerArgs() :+ "--show-top-disk-used-apps"
+    captureOutputAndValidateResponse(args, "AppDiskUsagesResponse")
+  }
+
+  test("worker --show-partition-location-info") {
+    val args = prepareWorkerArgs() :+ "--show-partition-location-info"
+    captureOutputAndValidateResponse(args, "ShufflePartitionsResponse")
+  }
+
+  test("worker --show-unavailable-peers") {
+    val args = prepareWorkerArgs() :+ "--show-unavailable-peers"
+    captureOutputAndValidateResponse(args, "UnAvailablePeersResponse")
+  }
+
+  test("worker --is-shutdown") {
+    val args = prepareWorkerArgs() :+ "--is-shutdown"
+    captureOutputAndValidateResponse(args, "false")
+  }
+
+  test("worker --is-registered") {
+    val args = prepareWorkerArgs() :+ "--is-registered"
+    captureOutputAndValidateResponse(args, "true")
+  }
+
+  test("worker --show-conf") {
+    val args = prepareWorkerArgs() :+ "--show-conf"
+    captureOutputAndValidateResponse(args, "ConfResponse")
+  }
+
+  test("worker --show-dynamic-conf") {
+    cancel("This test is temporarily disabled since dynamic conf is not 
enabled in unit tests.")
+    val args = prepareWorkerArgs() :+ "--show-dynamic-conf"
+    captureOutputAndValidateResponse(args, "")
+  }
+
+  test("worker --show-thread-dump") {
+    val args = prepareWorkerArgs() :+ "--show-thread-dump"
+    captureOutputAndValidateResponse(args, "ThreadStackResponse")
+  }
+
+  test("master --show-masters-info") {
+    cancel("This test is temporarily disabled since HA is not enabled in the 
unit tests.")
+    val args = prepareMasterArgs() :+ "--show-masters-info"
+    captureOutputAndValidateResponse(args, "")
+  }
+
+  test("master --show-cluster-apps") {
+    val args = prepareMasterArgs() :+ "--show-cluster-apps"
+    captureOutputAndValidateResponse(args, "ApplicationsHeartbeatResponse")
+  }
+
+  test("master --show-cluster-shuffles") {
+    val args = prepareMasterArgs() :+ "--show-cluster-shuffles"
+    captureOutputAndValidateResponse(args, "ShufflesResponse")
+  }
+
+  test("master --show-top-disk-used-apps") {
+    val args = prepareMasterArgs() :+ "--show-top-disk-used-apps"
+    captureOutputAndValidateResponse(args, "AppDiskUsageSnapshotsResponse")
+  }
+
+  test("master --show-worker-event-info") {
+    val args = prepareMasterArgs() :+ "--show-worker-event-info"
+    captureOutputAndValidateResponse(args, "WorkerEventsResponse")
+  }
+
+  test("master --show-lost-workers") {
+    val args = prepareMasterArgs() :+ "--show-lost-workers"
+    captureOutputAndValidateResponse(args, "No lost workers found.")
+  }
+
+  test("master --show-excluded-workers") {
+    val args = prepareMasterArgs() :+ "--show-excluded-workers"
+    captureOutputAndValidateResponse(args, "No excluded workers found.")
+  }
+
+  test("master --show-shutdown-workers") {
+    val args = prepareMasterArgs() :+ "--show-shutdown-workers"
+    captureOutputAndValidateResponse(args, "No shutdown workers found.")
+  }
+
+  test("master --show-lifecycle-managers") {
+    val args = prepareMasterArgs() :+ "--show-lifecycle-managers"
+    captureOutputAndValidateResponse(args, "HostnamesResponse")
+  }
+
+  test("master --show-workers") {
+    val args = prepareMasterArgs() :+ "--show-workers"
+    captureOutputAndValidateResponse(args, "WorkersResponse")
+  }
+
+  test("master --show-conf") {
+    val args = prepareMasterArgs() :+ "--show-conf"
+    captureOutputAndValidateResponse(args, "ConfResponse")
+  }
+
+  test("master --show-dynamic-conf") {
+    cancel("This test is temporarily disabled since dynamic conf is not 
enabled in unit tests.")
+    val args = prepareMasterArgs() :+ "--show-dynamic-conf"
+    captureOutputAndValidateResponse(args, "")
+  }
+
+  test("master --show-thread-dump") {
+    val args = prepareMasterArgs() :+ "--show-thread-dump"
+    captureOutputAndValidateResponse(args, "ThreadStackResponse")
+  }
+
+  test("master --exclude-worker and --remove-excluded-worker") {
+    val excludeArgs = prepareMasterArgs() ++ Array(
+      "--exclude-worker",
+      "--worker-ids",
+      getWorkerId())
+    captureOutputAndValidateResponse(excludeArgs, "success: true")
+    val removeExcludedArgs = prepareMasterArgs() ++ Array(
+      "--remove-excluded-worker",
+      "--worker-ids",
+      getWorkerId())
+    captureOutputAndValidateResponse(removeExcludedArgs, "success: true")
+  }
+
+  test("master --send-worker-event") {
+    val args = prepareMasterArgs() ++ Array(
+      "--send-worker-event",
+      "DECOMMISSION",
+      "--worker-ids",
+      getWorkerId())
+    captureOutputAndValidateResponse(args, "success: true")
+  }
+
+  private def prepareMasterArgs(): Array[String] = {
+    Array(
+      "master",
+      "--cluster",
+      "unit-test")
+  }
+
+  private def prepareWorkerArgs(): Array[String] = {
+    Array(
+      "worker",
+      "--hostport",
+      worker.connectionUrl)
+  }
+
+  private def captureOutputAndValidateResponse(
+      args: Array[String],
+      stdoutValidationString: String): Unit = {
+    val stdoutStream = new ByteArrayOutputStream()
+    val stdoutPrintStream = new PrintStream(stdoutStream)
+    Console.withOut(stdoutPrintStream) {
+      CelebornCli.main(args)
+    }
+    val stdout = stdoutStream.toString
+    assert(stdout.nonEmpty && stdout.contains(stdoutValidationString))
+  }
+
+  private def getWorkerId(): String = {
+    
s"${worker.workerArgs.host}:${worker.rpcEnv.address.port}:${worker.getPushFetchServerPort._1}"
 +
+      s":${worker.getPushFetchServerPort._2}:${worker.replicateServer.getPort}"
+  }
+}
diff --git a/common/src/main/scala/org/apache/celeborn/common/util/Utils.scala 
b/common/src/main/scala/org/apache/celeborn/common/util/Utils.scala
index 02650dc1b..c452cbc0c 100644
--- a/common/src/main/scala/org/apache/celeborn/common/util/Utils.scala
+++ b/common/src/main/scala/org/apache/celeborn/common/util/Utils.scala
@@ -990,6 +990,17 @@ object Utils extends Logging {
     }
   }
 
+  def tryWithResources[R <: Closeable, U](f: => R)(func: R => U): U = {
+    val res = f
+    try {
+      func(f)
+    } finally {
+      if (null != res) {
+        res.close()
+      }
+    }
+  }
+
   def toTransportMessage(message: Any): Any = {
     message match {
       case legacy: Message =>
diff --git a/pom.xml b/pom.xml
index 2e06cf2b2..9f91c1552 100644
--- a/pom.xml
+++ b/pom.xml
@@ -38,6 +38,7 @@
     <module>service</module>
     <module>master</module>
     <module>worker</module>
+    <module>cli</module>
   </modules>
 
   <distributionManagement>
@@ -104,6 +105,7 @@
     <jackson.version>2.15.3</jackson.version>
     <snappy.version>1.1.10.5</snappy.version>
     <ap.loader.version>3.0-8</ap.loader.version>
+    <picocli.version>4.7.6</picocli.version>
 
     <!-- Db dependencies -->
     <mybatis.version>3.5.15</mybatis.version>
@@ -708,6 +710,11 @@
         <version>${bouncycastle.version}</version>
         <scope>test</scope>
       </dependency>
+      <dependency>
+        <groupId>info.picocli</groupId>
+        <artifactId>picocli</artifactId>
+        <version>${picocli.version}</version>
+      </dependency>
     </dependencies>
   </dependencyManagement>
 
diff --git a/project/CelebornBuild.scala b/project/CelebornBuild.scala
index 31816fe2c..ddddcf366 100644
--- a/project/CelebornBuild.scala
+++ b/project/CelebornBuild.scala
@@ -80,6 +80,7 @@ object Dependencies {
   val httpClient5Version = "5.3.1"
   val httpCore5Version = "5.2.4"
   val javaxAnnotationApiVersion = "1.3.2"
+  val picocliVersion = "4.7.6"
 
   // For SSL support
   val bouncycastleVersion = "1.77"
@@ -203,6 +204,8 @@ object Dependencies {
   // SSL support
   val bouncycastleBcprovJdk18on = "org.bouncycastle" % "bcprov-jdk18on" % 
bouncycastleVersion % "test"
   val bouncycastleBcpkixJdk18on = "org.bouncycastle" % "bcpkix-jdk18on" % 
bouncycastleVersion % "test"
+
+  val picocli = "info.picocli" % "picocli" % picocliVersion
 }
 
 object CelebornCommonSettings {
@@ -355,7 +358,8 @@ object CelebornBuild extends sbt.internal.BuildDef {
       CelebornClient.client,
       CelebornService.service,
       CelebornWorker.worker,
-      CelebornMaster.master) ++ maybeSparkClientModules ++ 
maybeFlinkClientModules ++ maybeMRClientModules ++ maybeWebModules
+      CelebornMaster.master,
+      CelebornCli.cli) ++ maybeSparkClientModules ++ maybeFlinkClientModules 
++ maybeMRClientModules ++ maybeWebModules
   }
 
   // ThisBuild / parallelExecution := false
@@ -455,6 +459,20 @@ object Utils {
   }
 }
 
+object CelebornCli {
+  lazy val cli = Project("celeborn-cli", file("cli"))
+    .dependsOn(CelebornCommon.common % "test->test;compile->compile")
+    .dependsOn(CelebornMaster.master % "test->test;compile->compile")
+    .dependsOn(CelebornWorker.worker % "test->test;compile->compile")
+    .dependsOn(CelebornOpenApi.openApiClient % "test->test;compile->compile")
+    .settings (
+      commonSettings,
+      libraryDependencies ++= Seq(
+        Dependencies.picocli
+      ) ++ commonUnitTestDependencies
+    )
+}
+
 object CelebornSpi {
   lazy val spi = Project("celeborn-spi", file("spi"))
     .settings(
diff --git a/sbin/celeborn-cli b/sbin/celeborn-cli
new file mode 100755
index 000000000..ed89f22f8
--- /dev/null
+++ b/sbin/celeborn-cli
@@ -0,0 +1,24 @@
+#!/usr/bin/env bash
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+if [ -z "${CELEBORN_HOME}" ]; then
+  export CELEBORN_HOME="$(cd "`dirname "$0"`"/..; pwd)"
+fi
+
+. "${CELEBORN_HOME}/sbin/load-celeborn-env.sh"
+exec "${CELEBORN_HOME}"/bin/celeborn-class org.apache.celeborn.cli.CelebornCli 
"$@"

Reply via email to