This is an automated email from the ASF dual-hosted git repository.

tasanuma pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/hadoop.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 03d9acaa862a HDFS-17361. DiskBalancer: Query command support with 
multiple nodes (#6508)
03d9acaa862a is described below

commit 03d9acaa862a438d6fdf833dbbbf1ccbff246f9d
Author: huhaiyang <huhaiyang...@126.com>
AuthorDate: Mon Feb 19 08:34:59 2024 +0800

    HDFS-17361. DiskBalancer: Query command support with multiple nodes (#6508)
    
    Signed-off-by: Takanobu Asanuma <tasan...@apache.org>
---
 .../server/diskbalancer/command/QueryCommand.java  | 89 ++++++++++++++--------
 .../apache/hadoop/hdfs/tools/DiskBalancerCLI.java  |  6 +-
 .../src/site/markdown/HDFSDiskbalancer.md          |  4 +-
 .../command/TestDiskBalancerCommand.java           | 33 +++++++-
 4 files changed, 94 insertions(+), 38 deletions(-)

diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/diskbalancer/command/QueryCommand.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/diskbalancer/command/QueryCommand.java
index f5d4cfacd826..4796cdfa5268 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/diskbalancer/command/QueryCommand.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/diskbalancer/command/QueryCommand.java
@@ -19,6 +19,8 @@
 
 package org.apache.hadoop.hdfs.server.diskbalancer.command;
 
+import org.apache.commons.lang3.StringUtils;
+import org.apache.commons.text.TextStringBuilder;
 import org.apache.hadoop.util.Preconditions;
 import org.apache.commons.cli.CommandLine;
 import org.apache.commons.cli.HelpFormatter;
@@ -30,6 +32,11 @@ import 
org.apache.hadoop.hdfs.server.diskbalancer.DiskBalancerException;
 import org.apache.hadoop.hdfs.tools.DiskBalancerCLI;
 import org.apache.hadoop.net.NetUtils;
 
+import java.io.PrintStream;
+import java.util.Collections;
+import java.util.Set;
+import java.util.TreeSet;
+
 /**
  * Gets the current status of disk balancer command.
  */
@@ -41,9 +48,13 @@ public class QueryCommand extends Command {
    * @param conf - Configuration.
    */
   public QueryCommand(Configuration conf) {
-    super(conf);
+    this(conf, System.out);
+  }
+
+  public QueryCommand(Configuration conf, final PrintStream ps) {
+    super(conf, ps);
     addValidCommandParameters(DiskBalancerCLI.QUERY,
-        "Queries the status of disk plan running on a given datanode.");
+        "Queries the status of disk plan running on given datanode(s).");
     addValidCommandParameters(DiskBalancerCLI.VERBOSE,
         "Prints verbose results.");
   }
@@ -56,37 +67,55 @@ public class QueryCommand extends Command {
   @Override
   public void execute(CommandLine cmd) throws Exception {
     LOG.info("Executing \"query plan\" command.");
+    TextStringBuilder result = new TextStringBuilder();
     Preconditions.checkState(cmd.hasOption(DiskBalancerCLI.QUERY));
     verifyCommandOptions(DiskBalancerCLI.QUERY, cmd);
-    String nodeName = cmd.getOptionValue(DiskBalancerCLI.QUERY);
-    Preconditions.checkNotNull(nodeName);
-    nodeName = nodeName.trim();
-    String nodeAddress = nodeName;
-
-    // if the string is not name:port format use the default port.
-    if (!nodeName.matches("[^\\:]+:[0-9]{2,5}")) {
-      int defaultIPC = NetUtils.createSocketAddr(
-          getConf().getTrimmed(DFSConfigKeys.DFS_DATANODE_IPC_ADDRESS_KEY,
-              DFSConfigKeys.DFS_DATANODE_IPC_ADDRESS_DEFAULT)).getPort();
-      nodeAddress = nodeName + ":" + defaultIPC;
-      LOG.debug("Using default data node port :  {}", nodeAddress);
+    String nodeVal = cmd.getOptionValue(DiskBalancerCLI.QUERY);
+    if (StringUtils.isBlank(nodeVal)) {
+      String warnMsg = "The number of input nodes is 0. "
+          + "Please input the valid nodes.";
+      throw new DiskBalancerException(warnMsg,
+          DiskBalancerException.Result.INVALID_NODE);
     }
+    nodeVal = nodeVal.trim();
+    Set<String> resultSet = new TreeSet<>();
+    String[] nodes = nodeVal.split(",");
+    Collections.addAll(resultSet, nodes);
+    String outputLine = String.format(
+        "Get current status of the diskbalancer for DataNode(s). "
+            + "These DataNode(s) are parsed from '%s'.", nodeVal);
+    recordOutput(result, outputLine);
+    for (String nodeName : resultSet) {
+      // if the string is not name:port format use the default port.
+      String nodeAddress = nodeName;
+      if (!nodeName.matches("[^\\:]+:[0-9]{2,5}")) {
+        int defaultIPC = NetUtils.createSocketAddr(
+            getConf().getTrimmed(DFSConfigKeys.DFS_DATANODE_IPC_ADDRESS_KEY,
+                DFSConfigKeys.DFS_DATANODE_IPC_ADDRESS_DEFAULT)).getPort();
+        nodeAddress = nodeName + ":" + defaultIPC;
+        LOG.debug("Using default data node port :  {}", nodeAddress);
+      }
 
-    ClientDatanodeProtocol dataNode = getDataNodeProxy(nodeAddress);
-    try {
-      DiskBalancerWorkStatus workStatus = dataNode.queryDiskBalancerPlan();
-      System.out.printf("Plan File: %s%nPlan ID: %s%nResult: %s%n",
-              workStatus.getPlanFile(),
-              workStatus.getPlanID(),
-              workStatus.getResult().toString());
-
-      if (cmd.hasOption(DiskBalancerCLI.VERBOSE)) {
-        System.out.printf("%s", workStatus.currentStateString());
+      ClientDatanodeProtocol dataNode = getDataNodeProxy(nodeAddress);
+      try {
+        DiskBalancerWorkStatus workStatus = dataNode.queryDiskBalancerPlan();
+        outputLine = String.format("DataNode: %s%nPlan File: %s%nPlan ID: 
%s%nResult: %s%n",
+            nodeAddress,
+            workStatus.getPlanFile(),
+            workStatus.getPlanID(),
+            workStatus.getResult().toString());
+        result.append(outputLine);
+        if (cmd.hasOption(DiskBalancerCLI.VERBOSE)) {
+          outputLine = String.format("%s", workStatus.currentStateString());
+          result.append(outputLine);
+        }
+        result.append(System.lineSeparator());
+      } catch (DiskBalancerException ex) {
+        LOG.error("Query plan failed by {}", nodeAddress, ex);
+        throw ex;
       }
-    } catch (DiskBalancerException ex) {
-      LOG.error("Query plan failed.", ex);
-      throw ex;
     }
+    getPrintStream().println(result);
   }
 
   /**
@@ -94,14 +123,14 @@ public class QueryCommand extends Command {
    */
   @Override
   public void printHelp() {
-    String header = "Query Plan queries a given data node about the " +
+    String header = "Query Plan queries given datanode(s) about the " +
         "current state of disk balancer execution.\n\n";
 
     String footer = "\nQuery command retrievs the plan ID and the current " +
         "running state. ";
-
     HelpFormatter helpFormatter = new HelpFormatter();
-    helpFormatter.printHelp("hdfs diskbalancer -query <hostname>  [options]",
+    helpFormatter.printHelp("hdfs diskbalancer -query <hostname,hostname,...> 
" +
+            " [options]",
         header, DiskBalancerCLI.getQueryOptions(), footer);
   }
 }
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/tools/DiskBalancerCLI.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/tools/DiskBalancerCLI.java
index 14b6e8949c59..d853f9684e1b 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/tools/DiskBalancerCLI.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/tools/DiskBalancerCLI.java
@@ -378,7 +378,7 @@ public class DiskBalancerCLI extends Configured implements 
Tool {
     Option query = Option.builder().longOpt(QUERY)
         .hasArg()
         .desc("Queries the disk balancer " +
-            "status of a given datanode.")
+            "status of given datanode(s).")
         .build();
     getQueryOptions().addOption(query);
     opt.addOption(query);
@@ -387,7 +387,7 @@ public class DiskBalancerCLI extends Configured implements 
Tool {
     // added to global table.
     Option verbose = Option.builder().longOpt(VERBOSE)
         .desc("Prints details of the plan that is being executed " +
-            "on the node.")
+            "on the datanode(s).")
         .build();
     getQueryOptions().addOption(verbose);
   }
@@ -482,7 +482,7 @@ public class DiskBalancerCLI extends Configured implements 
Tool {
       }
 
       if (cmd.hasOption(DiskBalancerCLI.QUERY)) {
-        dbCmd = new QueryCommand(getConf());
+        dbCmd = new QueryCommand(getConf(), this.printStream);
       }
 
       if (cmd.hasOption(DiskBalancerCLI.CANCEL)) {
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/site/markdown/HDFSDiskbalancer.md 
b/hadoop-hdfs-project/hadoop-hdfs/src/site/markdown/HDFSDiskbalancer.md
index d3b1ca8f2e9f..24e88edb053f 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/site/markdown/HDFSDiskbalancer.md
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/site/markdown/HDFSDiskbalancer.md
@@ -86,9 +86,9 @@ So, query command can help to get the current status of 
execute command.
 
 ### Query
 
-Query command gets the current status of the diskbalancer from a datanode.
+Query command gets the current status of the diskbalancer from specified 
node(s).
 
-`hdfs diskbalancer -query nodename.mycluster.com`
+`hdfs diskbalancer -query nodename1.mycluster.com,nodename2.mycluster.com,...`
 
 | COMMAND\_OPTION | Description |
 |:---- |:---- |
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/diskbalancer/command/TestDiskBalancerCommand.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/diskbalancer/command/TestDiskBalancerCommand.java
index d0107b507ade..1cdfcdcebc71 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/diskbalancer/command/TestDiskBalancerCommand.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/diskbalancer/command/TestDiskBalancerCommand.java
@@ -814,17 +814,44 @@ public class TestDiskBalancerCommand {
     return runCommandInternal(cmdLine, clusterConf);
   }
 
+  /**
+   * Making sure that we can query the multiple nodes without having done a 
submit.
+   * @throws Exception
+   */
+  @Test
+  public void testDiskBalancerQueryWithoutSubmitAndMultipleNodes() throws 
Exception {
+    Configuration hdfsConf = new HdfsConfiguration();
+    hdfsConf.setBoolean(DFSConfigKeys.DFS_DISK_BALANCER_ENABLED, true);
+    final int numDatanodes = 2;
+    File basedir = new File(GenericTestUtils.getRandomizedTempPath());
+    MiniDFSCluster miniDFSCluster = new MiniDFSCluster.Builder(hdfsConf, 
basedir)
+        .numDataNodes(numDatanodes).build();
+    try {
+      miniDFSCluster.waitActive();
+      DataNode dataNode1 = miniDFSCluster.getDataNodes().get(0);
+      DataNode dataNode2 = miniDFSCluster.getDataNodes().get(1);
+      final String queryArg = String.format("-query 
localhost:%d,localhost:%d", dataNode1
+          .getIpcPort(), dataNode2.getIpcPort());
+      final String cmdLine = String.format("hdfs diskbalancer %s", queryArg);
+      List<String> outputs = runCommand(cmdLine);
+      assertThat(outputs.get(1), containsString("localhost:" + 
dataNode1.getIpcPort()));
+      assertThat(outputs.get(6), containsString("localhost:" + 
dataNode2.getIpcPort()));
+    } finally {
+      miniDFSCluster.shutdown();
+    }
+  }
+
   /**
    * Making sure that we can query the node without having done a submit.
    * @throws Exception
    */
   @Test
   public void testDiskBalancerQueryWithoutSubmit() throws Exception {
-    Configuration conf = new HdfsConfiguration();
-    conf.setBoolean(DFSConfigKeys.DFS_DISK_BALANCER_ENABLED, true);
+    Configuration hdfsConf = new HdfsConfiguration();
+    hdfsConf.setBoolean(DFSConfigKeys.DFS_DISK_BALANCER_ENABLED, true);
     final int numDatanodes = 2;
     File basedir = new File(GenericTestUtils.getRandomizedTempPath());
-    MiniDFSCluster miniDFSCluster = new MiniDFSCluster.Builder(conf, basedir)
+    MiniDFSCluster miniDFSCluster = new MiniDFSCluster.Builder(hdfsConf, 
basedir)
         .numDataNodes(numDatanodes).build();
     try {
       miniDFSCluster.waitActive();


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org
For additional commands, e-mail: common-commits-h...@hadoop.apache.org

Reply via email to