This is an automated email from the ASF dual-hosted git repository. tasanuma pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/hadoop.git
The following commit(s) were added to refs/heads/trunk by this push: new 03d9acaa862a HDFS-17361. DiskBalancer: Query command support with multiple nodes (#6508) 03d9acaa862a is described below commit 03d9acaa862a438d6fdf833dbbbf1ccbff246f9d Author: huhaiyang <huhaiyang...@126.com> AuthorDate: Mon Feb 19 08:34:59 2024 +0800 HDFS-17361. DiskBalancer: Query command support with multiple nodes (#6508) Signed-off-by: Takanobu Asanuma <tasan...@apache.org> --- .../server/diskbalancer/command/QueryCommand.java | 89 ++++++++++++++-------- .../apache/hadoop/hdfs/tools/DiskBalancerCLI.java | 6 +- .../src/site/markdown/HDFSDiskbalancer.md | 4 +- .../command/TestDiskBalancerCommand.java | 33 +++++++- 4 files changed, 94 insertions(+), 38 deletions(-) diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/diskbalancer/command/QueryCommand.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/diskbalancer/command/QueryCommand.java index f5d4cfacd826..4796cdfa5268 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/diskbalancer/command/QueryCommand.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/diskbalancer/command/QueryCommand.java @@ -19,6 +19,8 @@ package org.apache.hadoop.hdfs.server.diskbalancer.command; +import org.apache.commons.lang3.StringUtils; +import org.apache.commons.text.TextStringBuilder; import org.apache.hadoop.util.Preconditions; import org.apache.commons.cli.CommandLine; import org.apache.commons.cli.HelpFormatter; @@ -30,6 +32,11 @@ import org.apache.hadoop.hdfs.server.diskbalancer.DiskBalancerException; import org.apache.hadoop.hdfs.tools.DiskBalancerCLI; import org.apache.hadoop.net.NetUtils; +import java.io.PrintStream; +import java.util.Collections; +import java.util.Set; +import java.util.TreeSet; + /** * Gets the current status of disk balancer command. */ @@ -41,9 +48,13 @@ public class QueryCommand extends Command { * @param conf - Configuration. */ public QueryCommand(Configuration conf) { - super(conf); + this(conf, System.out); + } + + public QueryCommand(Configuration conf, final PrintStream ps) { + super(conf, ps); addValidCommandParameters(DiskBalancerCLI.QUERY, - "Queries the status of disk plan running on a given datanode."); + "Queries the status of disk plan running on given datanode(s)."); addValidCommandParameters(DiskBalancerCLI.VERBOSE, "Prints verbose results."); } @@ -56,37 +67,55 @@ public class QueryCommand extends Command { @Override public void execute(CommandLine cmd) throws Exception { LOG.info("Executing \"query plan\" command."); + TextStringBuilder result = new TextStringBuilder(); Preconditions.checkState(cmd.hasOption(DiskBalancerCLI.QUERY)); verifyCommandOptions(DiskBalancerCLI.QUERY, cmd); - String nodeName = cmd.getOptionValue(DiskBalancerCLI.QUERY); - Preconditions.checkNotNull(nodeName); - nodeName = nodeName.trim(); - String nodeAddress = nodeName; - - // if the string is not name:port format use the default port. - if (!nodeName.matches("[^\\:]+:[0-9]{2,5}")) { - int defaultIPC = NetUtils.createSocketAddr( - getConf().getTrimmed(DFSConfigKeys.DFS_DATANODE_IPC_ADDRESS_KEY, - DFSConfigKeys.DFS_DATANODE_IPC_ADDRESS_DEFAULT)).getPort(); - nodeAddress = nodeName + ":" + defaultIPC; - LOG.debug("Using default data node port : {}", nodeAddress); + String nodeVal = cmd.getOptionValue(DiskBalancerCLI.QUERY); + if (StringUtils.isBlank(nodeVal)) { + String warnMsg = "The number of input nodes is 0. " + + "Please input the valid nodes."; + throw new DiskBalancerException(warnMsg, + DiskBalancerException.Result.INVALID_NODE); } + nodeVal = nodeVal.trim(); + Set<String> resultSet = new TreeSet<>(); + String[] nodes = nodeVal.split(","); + Collections.addAll(resultSet, nodes); + String outputLine = String.format( + "Get current status of the diskbalancer for DataNode(s). " + + "These DataNode(s) are parsed from '%s'.", nodeVal); + recordOutput(result, outputLine); + for (String nodeName : resultSet) { + // if the string is not name:port format use the default port. + String nodeAddress = nodeName; + if (!nodeName.matches("[^\\:]+:[0-9]{2,5}")) { + int defaultIPC = NetUtils.createSocketAddr( + getConf().getTrimmed(DFSConfigKeys.DFS_DATANODE_IPC_ADDRESS_KEY, + DFSConfigKeys.DFS_DATANODE_IPC_ADDRESS_DEFAULT)).getPort(); + nodeAddress = nodeName + ":" + defaultIPC; + LOG.debug("Using default data node port : {}", nodeAddress); + } - ClientDatanodeProtocol dataNode = getDataNodeProxy(nodeAddress); - try { - DiskBalancerWorkStatus workStatus = dataNode.queryDiskBalancerPlan(); - System.out.printf("Plan File: %s%nPlan ID: %s%nResult: %s%n", - workStatus.getPlanFile(), - workStatus.getPlanID(), - workStatus.getResult().toString()); - - if (cmd.hasOption(DiskBalancerCLI.VERBOSE)) { - System.out.printf("%s", workStatus.currentStateString()); + ClientDatanodeProtocol dataNode = getDataNodeProxy(nodeAddress); + try { + DiskBalancerWorkStatus workStatus = dataNode.queryDiskBalancerPlan(); + outputLine = String.format("DataNode: %s%nPlan File: %s%nPlan ID: %s%nResult: %s%n", + nodeAddress, + workStatus.getPlanFile(), + workStatus.getPlanID(), + workStatus.getResult().toString()); + result.append(outputLine); + if (cmd.hasOption(DiskBalancerCLI.VERBOSE)) { + outputLine = String.format("%s", workStatus.currentStateString()); + result.append(outputLine); + } + result.append(System.lineSeparator()); + } catch (DiskBalancerException ex) { + LOG.error("Query plan failed by {}", nodeAddress, ex); + throw ex; } - } catch (DiskBalancerException ex) { - LOG.error("Query plan failed.", ex); - throw ex; } + getPrintStream().println(result); } /** @@ -94,14 +123,14 @@ public class QueryCommand extends Command { */ @Override public void printHelp() { - String header = "Query Plan queries a given data node about the " + + String header = "Query Plan queries given datanode(s) about the " + "current state of disk balancer execution.\n\n"; String footer = "\nQuery command retrievs the plan ID and the current " + "running state. "; - HelpFormatter helpFormatter = new HelpFormatter(); - helpFormatter.printHelp("hdfs diskbalancer -query <hostname> [options]", + helpFormatter.printHelp("hdfs diskbalancer -query <hostname,hostname,...> " + + " [options]", header, DiskBalancerCLI.getQueryOptions(), footer); } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/tools/DiskBalancerCLI.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/tools/DiskBalancerCLI.java index 14b6e8949c59..d853f9684e1b 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/tools/DiskBalancerCLI.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/tools/DiskBalancerCLI.java @@ -378,7 +378,7 @@ public class DiskBalancerCLI extends Configured implements Tool { Option query = Option.builder().longOpt(QUERY) .hasArg() .desc("Queries the disk balancer " + - "status of a given datanode.") + "status of given datanode(s).") .build(); getQueryOptions().addOption(query); opt.addOption(query); @@ -387,7 +387,7 @@ public class DiskBalancerCLI extends Configured implements Tool { // added to global table. Option verbose = Option.builder().longOpt(VERBOSE) .desc("Prints details of the plan that is being executed " + - "on the node.") + "on the datanode(s).") .build(); getQueryOptions().addOption(verbose); } @@ -482,7 +482,7 @@ public class DiskBalancerCLI extends Configured implements Tool { } if (cmd.hasOption(DiskBalancerCLI.QUERY)) { - dbCmd = new QueryCommand(getConf()); + dbCmd = new QueryCommand(getConf(), this.printStream); } if (cmd.hasOption(DiskBalancerCLI.CANCEL)) { diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/site/markdown/HDFSDiskbalancer.md b/hadoop-hdfs-project/hadoop-hdfs/src/site/markdown/HDFSDiskbalancer.md index d3b1ca8f2e9f..24e88edb053f 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/site/markdown/HDFSDiskbalancer.md +++ b/hadoop-hdfs-project/hadoop-hdfs/src/site/markdown/HDFSDiskbalancer.md @@ -86,9 +86,9 @@ So, query command can help to get the current status of execute command. ### Query -Query command gets the current status of the diskbalancer from a datanode. +Query command gets the current status of the diskbalancer from specified node(s). -`hdfs diskbalancer -query nodename.mycluster.com` +`hdfs diskbalancer -query nodename1.mycluster.com,nodename2.mycluster.com,...` | COMMAND\_OPTION | Description | |:---- |:---- | diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/diskbalancer/command/TestDiskBalancerCommand.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/diskbalancer/command/TestDiskBalancerCommand.java index d0107b507ade..1cdfcdcebc71 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/diskbalancer/command/TestDiskBalancerCommand.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/diskbalancer/command/TestDiskBalancerCommand.java @@ -814,17 +814,44 @@ public class TestDiskBalancerCommand { return runCommandInternal(cmdLine, clusterConf); } + /** + * Making sure that we can query the multiple nodes without having done a submit. + * @throws Exception + */ + @Test + public void testDiskBalancerQueryWithoutSubmitAndMultipleNodes() throws Exception { + Configuration hdfsConf = new HdfsConfiguration(); + hdfsConf.setBoolean(DFSConfigKeys.DFS_DISK_BALANCER_ENABLED, true); + final int numDatanodes = 2; + File basedir = new File(GenericTestUtils.getRandomizedTempPath()); + MiniDFSCluster miniDFSCluster = new MiniDFSCluster.Builder(hdfsConf, basedir) + .numDataNodes(numDatanodes).build(); + try { + miniDFSCluster.waitActive(); + DataNode dataNode1 = miniDFSCluster.getDataNodes().get(0); + DataNode dataNode2 = miniDFSCluster.getDataNodes().get(1); + final String queryArg = String.format("-query localhost:%d,localhost:%d", dataNode1 + .getIpcPort(), dataNode2.getIpcPort()); + final String cmdLine = String.format("hdfs diskbalancer %s", queryArg); + List<String> outputs = runCommand(cmdLine); + assertThat(outputs.get(1), containsString("localhost:" + dataNode1.getIpcPort())); + assertThat(outputs.get(6), containsString("localhost:" + dataNode2.getIpcPort())); + } finally { + miniDFSCluster.shutdown(); + } + } + /** * Making sure that we can query the node without having done a submit. * @throws Exception */ @Test public void testDiskBalancerQueryWithoutSubmit() throws Exception { - Configuration conf = new HdfsConfiguration(); - conf.setBoolean(DFSConfigKeys.DFS_DISK_BALANCER_ENABLED, true); + Configuration hdfsConf = new HdfsConfiguration(); + hdfsConf.setBoolean(DFSConfigKeys.DFS_DISK_BALANCER_ENABLED, true); final int numDatanodes = 2; File basedir = new File(GenericTestUtils.getRandomizedTempPath()); - MiniDFSCluster miniDFSCluster = new MiniDFSCluster.Builder(conf, basedir) + MiniDFSCluster miniDFSCluster = new MiniDFSCluster.Builder(hdfsConf, basedir) .numDataNodes(numDatanodes).build(); try { miniDFSCluster.waitActive(); --------------------------------------------------------------------- To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org For additional commands, e-mail: common-commits-h...@hadoop.apache.org