This is an automated email from the ASF dual-hosted git repository.

yiyang0203 pushed a commit to branch HDDS-5713
in repository https://gitbox.apache.org/repos/asf/ozone.git

commit 3529de4e28401745791a4f6b438a163320dfce91
Author: Symious <[email protected]>
AuthorDate: Tue Aug 16 10:40:22 2022 +0800

    HDDS-7106. [DiskBalancer] Client-SCM interface (#3663)
---
 .../apache/hadoop/hdds/scm/client/ScmClient.java   |  41 +++++
 .../protocol/StorageContainerLocationProtocol.java |  30 ++++
 .../scm/storage/DiskBalancerConfiguration.java     | 165 +++++++++++++++++++
 .../org/apache/hadoop/hdds/conf/ConfigTag.java     |   3 +-
 ...inerLocationProtocolClientSideTranslatorPB.java |  55 +++++++
 .../src/main/proto/ScmAdminProtocol.proto          |  21 +++
 .../interface-client/src/main/proto/hdds.proto     |  13 ++
 .../hadoop/hdds/scm/node/DiskBalancerManager.java  | 176 +++++++++++++++++++++
 .../hadoop/hdds/scm/node/DiskBalancerStatus.java   |  50 ++++++
 .../org/apache/hadoop/hdds/scm/node/NodeUtils.java | 150 ++++++++++++++++++
 ...inerLocationProtocolServerSideTranslatorPB.java |  33 +++-
 .../hdds/scm/server/SCMClientProtocolServer.java   |  50 ++++++
 .../hdds/scm/server/StorageContainerManager.java   |   8 +
 .../hdds/scm/node/TestDiskBalancerManager.java     |  99 ++++++++++++
 .../hdds/scm/cli/ContainerOperationClient.java     |  34 ++++
 .../hadoop/ozone/scm/node/TestDiskBalancer.java    |  90 +++++++++++
 16 files changed, 1016 insertions(+), 2 deletions(-)

diff --git 
a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/client/ScmClient.java
 
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/client/ScmClient.java
index fb5a2deee2..478d08dff0 100644
--- 
a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/client/ScmClient.java
+++ 
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/client/ScmClient.java
@@ -454,4 +454,45 @@ public interface ScmClient extends Closeable {
       String scmId) throws IOException;
 
   String getMetrics(String query) throws IOException;
+
+  /**
+   * Get DiskBalancer status.
+   * @param count top datanodes that need balancing
+   * @return List of DatanodeDiskBalancerInfo.
+   * @throws IOException
+   */
+  List<HddsProtos.DatanodeDiskBalancerInfoProto> getDiskBalancerReport(
+      int count) throws IOException;
+
+  /**
+   * Get DiskBalancer status.
+   * @param hosts If hosts is not null, return status of hosts; If hosts is
+   *              null, return status of all datanodes in balancing.
+   * @return List of DatanodeDiskBalancerInfo.
+   * @throws IOException
+   */
+  List<HddsProtos.DatanodeDiskBalancerInfoProto> getDiskBalancerStatus(
+      Optional<List<String>> hosts) throws IOException;
+
+  /**
+   * Start DiskBalancer.
+   */
+  void startDiskBalancer(
+      Optional<Double> threshold,
+      Optional<Double> bandwidth,
+      Optional<List<String>> hosts) throws IOException;
+
+  /**
+   * Stop DiskBalancer.
+   */
+  void stopDiskBalancer(Optional<List<String>> hosts) throws IOException;
+
+
+  /**
+   * Update DiskBalancer Configuration.
+   */
+  void updateDiskBalancerConfiguration(
+      Optional<Double> threshold,
+      Optional<Double> bandwidth,
+      Optional<List<String>> hosts) throws IOException;
 }
diff --git 
a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/protocol/StorageContainerLocationProtocol.java
 
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/protocol/StorageContainerLocationProtocol.java
index 663f317a3b..2a7969049d 100644
--- 
a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/protocol/StorageContainerLocationProtocol.java
+++ 
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/protocol/StorageContainerLocationProtocol.java
@@ -476,4 +476,34 @@ public interface StorageContainerLocationProtocol extends 
Closeable {
       String scmId) throws IOException;
 
   String getMetrics(String query) throws IOException;
+
+  List<HddsProtos.DatanodeDiskBalancerInfoProto> getDiskBalancerReport(
+      int count, int clientVersion) throws IOException;
+
+  /**
+   * Get DiskBalancer status.
+   */
+  List<HddsProtos.DatanodeDiskBalancerInfoProto> getDiskBalancerStatus(
+      Optional<List<String>> hosts, int clientVersion) throws IOException;
+
+  /**
+   * Start DiskBalancer.
+   */
+  void startDiskBalancer(
+      Optional<Double> threshold,
+      Optional<Double> bandwidth,
+      Optional<List<String>> hosts) throws IOException;
+
+  /**
+   * Stop DiskBalancer.
+   */
+  void stopDiskBalancer(Optional<List<String>> hosts) throws IOException;
+
+  /**
+   * Update DiskBalancer Configuration.
+   */
+  void updateDiskBalancerConfiguration(
+      Optional<Double> threshold,
+      Optional<Double> bandwidth,
+      Optional<List<String>> hosts) throws IOException;
 }
diff --git 
a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/storage/DiskBalancerConfiguration.java
 
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/storage/DiskBalancerConfiguration.java
new file mode 100644
index 0000000000..54709e4c17
--- /dev/null
+++ 
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/storage/DiskBalancerConfiguration.java
@@ -0,0 +1,165 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hdds.scm.storage;
+
+import jakarta.annotation.Nonnull;
+import org.apache.hadoop.hdds.conf.Config;
+import org.apache.hadoop.hdds.conf.ConfigGroup;
+import org.apache.hadoop.hdds.conf.ConfigTag;
+import org.apache.hadoop.hdds.conf.ConfigType;
+import org.apache.hadoop.hdds.conf.OzoneConfiguration;
+import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * This class contains configuration values for the DiskBalancer.
+ */
+@ConfigGroup(prefix = "hdds.datanode.disk.balancer")
+public final class DiskBalancerConfiguration {
+  private static final Logger LOG =
+      LoggerFactory.getLogger(DiskBalancerConfiguration.class);
+
+  @Config(key = "volume.density.threshold", type = ConfigType.AUTO,
+      defaultValue = "10", tags = {ConfigTag.DISKBALANCER},
+      description = "Threshold is a percentage in the range of 0 to 100. A " +
+          "datanode is considered balanced if for each volume, the " +
+          "utilization of the volume(used space to capacity ratio) differs" +
+          " from the utilization of the datanode(used space to capacity ratio" 
+
+          " of the entire datanode) no more than the threshold.")
+  private double threshold = 10d;
+
+  @Config(key = "max.disk.throughputInMBPerSec", type = ConfigType.AUTO,
+      defaultValue = "10", tags = {ConfigTag.DISKBALANCER},
+      description = "The max balance speed.")
+  private double diskBandwidth = 10;
+
+  @Config(key = "parallel.thread", type = ConfigType.AUTO,
+      defaultValue = "5", tags = {ConfigTag.DISKBALANCER},
+      description = "The max parallel balance thread count.")
+  private int parallelThread = 5;
+
+  /**
+   * Gets the threshold value for DiskBalancer.
+   *
+   * @return percentage value in the range 0 to 100
+   */
+  public double getThreshold() {
+    return threshold;
+  }
+
+  public double getThresholdAsRatio() {
+    return threshold / 100;
+  }
+
+  /**
+   * Sets the threshold value for Disk Balancer.
+   *
+   * @param threshold a percentage value in the range 0 to 100
+   */
+  public void setThreshold(double threshold) {
+    if (threshold < 0d || threshold >= 100d) {
+      throw new IllegalArgumentException(
+          "Threshold must be a percentage(double) in the range 0 to 100.");
+    }
+    this.threshold = threshold;
+  }
+
+  /**
+   * Gets the disk bandwidth value for Disk Balancer.
+   *
+   * @return max disk bandwidth per second
+   */
+  public double getDiskBandwidth() {
+    return diskBandwidth;
+  }
+
+  /**
+   * Sets the disk bandwidth value for Disk Balancer.
+   *
+   * @param diskBandwidth the bandwidth to control balance speed
+   */
+  public void setDiskBandwidth(double diskBandwidth) {
+    if (diskBandwidth <= 0d) {
+      throw new IllegalArgumentException(
+          "diskBandwidth must be a value larger than 0.");
+    }
+    this.diskBandwidth = diskBandwidth;
+  }
+
+  /**
+   * Gets the parallel thread for Disk Balancer.
+   *
+   * @return parallel thread
+   */
+  public int getParallelThread() {
+    return parallelThread;
+  }
+
+  /**
+   * Sets the parallel thread for Disk Balancer.
+   *
+   * @param parallelThread the parallel thread count
+   */
+  public void setParallelThread(int parallelThread) {
+    if (parallelThread <= 0) {
+      throw new IllegalArgumentException(
+          "parallelThread must be a value larger than 0.");
+    }
+    this.parallelThread = parallelThread;
+  }
+  @Override
+  public String toString() {
+    return String.format("Disk Balancer Configuration values:%n" +
+            "%-50s %s%n" +
+            "%-50s %s%n" +
+            "%-50s %s%n" +
+            "%-50s %s%n",
+            "Key", "Value",
+        "Threshold", threshold, "Max disk bandwidth", diskBandwidth,
+        "Parallel Thread", parallelThread);
+  }
+
+  public HddsProtos.DiskBalancerConfigurationProto.Builder toProtobufBuilder() 
{
+    HddsProtos.DiskBalancerConfigurationProto.Builder builder =
+        HddsProtos.DiskBalancerConfigurationProto.newBuilder();
+
+    builder.setThreshold(threshold)
+        .setDiskBandwidth(diskBandwidth)
+        .setParallelThread(parallelThread);
+    return builder;
+  }
+
+  static DiskBalancerConfiguration fromProtobuf(
+      @Nonnull HddsProtos.DiskBalancerConfigurationProto proto,
+      @Nonnull OzoneConfiguration ozoneConfiguration) {
+    DiskBalancerConfiguration config =
+        ozoneConfiguration.getObject(DiskBalancerConfiguration.class);
+    if (proto.hasThreshold()) {
+      config.setThreshold(proto.getThreshold());
+    }
+    if (proto.hasDiskBandwidth()) {
+      config.setDiskBandwidth(proto.getDiskBandwidth());
+    }
+    if (proto.hasParallelThread()) {
+      config.setParallelThread(proto.getParallelThread());
+    }
+    return config;
+  }
+}
diff --git 
a/hadoop-hdds/config/src/main/java/org/apache/hadoop/hdds/conf/ConfigTag.java 
b/hadoop-hdds/config/src/main/java/org/apache/hadoop/hdds/conf/ConfigTag.java
index 24feb69389..d4817ea592 100644
--- 
a/hadoop-hdds/config/src/main/java/org/apache/hadoop/hdds/conf/ConfigTag.java
+++ 
b/hadoop-hdds/config/src/main/java/org/apache/hadoop/hdds/conf/ConfigTag.java
@@ -52,5 +52,6 @@ public enum ConfigTag {
   TLS,
   TOKEN,
   UPGRADE,
-  X509
+  X509,
+  DISKBALANCER
 }
diff --git 
a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/scm/protocolPB/StorageContainerLocationProtocolClientSideTranslatorPB.java
 
b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/scm/protocolPB/StorageContainerLocationProtocolClientSideTranslatorPB.java
index 109358c67b..c1a48cf18f 100644
--- 
a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/scm/protocolPB/StorageContainerLocationProtocolClientSideTranslatorPB.java
+++ 
b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/scm/protocolPB/StorageContainerLocationProtocolClientSideTranslatorPB.java
@@ -47,6 +47,9 @@ import 
org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolPro
 import 
org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.DatanodeAdminErrorResponseProto;
 import 
org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.DatanodeUsageInfoRequestProto;
 import 
org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.DatanodeUsageInfoResponseProto;
+import 
org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.DatanodeDiskBalancerInfoType;
+import 
org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.DatanodeDiskBalancerInfoRequestProto;
+import 
org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.DatanodeDiskBalancerInfoResponseProto;
 import 
org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.DeactivatePipelineRequestProto;
 import 
org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.DecommissionNodesRequestProto;
 import 
org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.DecommissionNodesResponseProto;
@@ -1114,6 +1117,58 @@ public final class 
StorageContainerLocationProtocolClientSideTranslatorPB
     return response.getContainerCount();
   }
 
+  public List<HddsProtos.DatanodeDiskBalancerInfoProto> getDiskBalancerReport(
+      int count, int clientVersion) throws IOException {
+    DatanodeDiskBalancerInfoRequestProto request =
+        DatanodeDiskBalancerInfoRequestProto.newBuilder()
+            .setInfoType(DatanodeDiskBalancerInfoType.report)
+            .setCount(count)
+            .build();
+
+    DatanodeDiskBalancerInfoResponseProto response =
+        submitRequest(Type.DatanodeDiskBalancerInfo,
+            builder -> builder.setDatanodeDiskBalancerInfoRequest(request))
+            .getDatanodeDiskBalancerInfoResponse();
+
+    return response.getInfoList();
+  }
+
+  @Override
+  public List<HddsProtos.DatanodeDiskBalancerInfoProto> getDiskBalancerStatus(
+      Optional<List<String>> hosts, int clientVersion) throws IOException {
+    DatanodeDiskBalancerInfoRequestProto.Builder requestBuilder =
+        DatanodeDiskBalancerInfoRequestProto.newBuilder()
+            .setInfoType(DatanodeDiskBalancerInfoType.status);
+    hosts.ifPresent(requestBuilder::addAllHosts);
+    DatanodeDiskBalancerInfoRequestProto request = requestBuilder.build();
+
+    DatanodeDiskBalancerInfoResponseProto response =
+        submitRequest(Type.DatanodeDiskBalancerInfo,
+            builder -> builder.setDatanodeDiskBalancerInfoRequest(request))
+            .getDatanodeDiskBalancerInfoResponse();
+
+    return response.getInfoList();
+  }
+
+  @Override
+  public void startDiskBalancer(Optional<Double> threshold,
+      Optional<Double> bandwidth, Optional<List<String>> hosts)
+      throws IOException {
+
+  }
+
+  @Override
+  public void stopDiskBalancer(Optional<List<String>> hosts)
+      throws IOException {
+
+  }
+
+  @Override
+  public void updateDiskBalancerConfiguration(Optional<Double> threshold,
+      Optional<Double> bandwidth, Optional<List<String>> hosts)
+      throws IOException {
+  }
+
   @Override
   public Object getUnderlyingProxyObject() {
     return rpcProxy;
diff --git a/hadoop-hdds/interface-admin/src/main/proto/ScmAdminProtocol.proto 
b/hadoop-hdds/interface-admin/src/main/proto/ScmAdminProtocol.proto
index e8b8d62394..4730a4ccc7 100644
--- a/hadoop-hdds/interface-admin/src/main/proto/ScmAdminProtocol.proto
+++ b/hadoop-hdds/interface-admin/src/main/proto/ScmAdminProtocol.proto
@@ -84,6 +84,7 @@ message ScmContainerLocationRequest {
   optional SingleNodeQueryRequestProto singleNodeQueryRequest = 45;
   optional GetContainersOnDecomNodeRequestProto 
getContainersOnDecomNodeRequest = 46;
   optional GetMetricsRequestProto getMetricsRequest = 47;
+  optional DatanodeDiskBalancerInfoRequestProto 
DatanodeDiskBalancerInfoRequest = 48;
 }
 
 message ScmContainerLocationResponse {
@@ -139,6 +140,7 @@ message ScmContainerLocationResponse {
   optional SingleNodeQueryResponseProto singleNodeQueryResponse = 45;
   optional GetContainersOnDecomNodeResponseProto 
getContainersOnDecomNodeResponse = 46;
   optional GetMetricsResponseProto getMetricsResponse = 47;
+  optional DatanodeDiskBalancerInfoResponseProto 
DatanodeDiskBalancerInfoResponse = 48;
 
   enum Status {
     OK = 1;
@@ -193,6 +195,7 @@ enum Type {
   SingleNodeQuery = 41;
   GetContainersOnDecomNode = 42;
   GetMetrics = 43;
+  DatanodeDiskBalancerInfo = 44;
 }
 
 /**
@@ -357,6 +360,24 @@ message DatanodeUsageInfoResponseProto {
   repeated DatanodeUsageInfoProto info = 1;
 }
 
+enum DatanodeDiskBalancerInfoType{
+  report = 1;
+  status = 2;
+}
+
+/*
+  Datanode disk balancer status request message.
+*/
+message DatanodeDiskBalancerInfoRequestProto {
+  required DatanodeDiskBalancerInfoType infoType = 1;
+  optional uint32 count = 2;
+  repeated string hosts = 3;
+}
+
+message DatanodeDiskBalancerInfoResponseProto {
+  repeated DatanodeDiskBalancerInfoProto info = 1;
+}
+
 /*
   Decommission a list of hosts
 */
diff --git a/hadoop-hdds/interface-client/src/main/proto/hdds.proto 
b/hadoop-hdds/interface-client/src/main/proto/hdds.proto
index 4058453123..803842ed2d 100644
--- a/hadoop-hdds/interface-client/src/main/proto/hdds.proto
+++ b/hadoop-hdds/interface-client/src/main/proto/hdds.proto
@@ -521,3 +521,16 @@ message InnerNode {
     optional uint32 numOfLeaves = 2;
     repeated ChildrenMap childrenMap = 3;
 }
+
+message DiskBalancerConfigurationProto {
+    optional double threshold = 1;
+    optional double diskBandwidth = 2;
+    optional int32 parallelThread = 3;
+}
+
+message DatanodeDiskBalancerInfoProto {
+    required DatanodeDetailsProto node = 1;
+    required double currentVolumeDensitySum = 2;
+    optional bool diskBalancerRunning = 3;
+    optional DiskBalancerConfigurationProto diskBalancerConf = 4;
+}
diff --git 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/DiskBalancerManager.java
 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/DiskBalancerManager.java
new file mode 100644
index 0000000000..0b16c1aa95
--- /dev/null
+++ 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/DiskBalancerManager.java
@@ -0,0 +1,176 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdds.scm.node;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+import org.apache.hadoop.hdds.conf.OzoneConfiguration;
+import org.apache.hadoop.hdds.protocol.DatanodeDetails;
+import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
+import 
org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos;
+import org.apache.hadoop.hdds.scm.ha.SCMContext;
+import org.apache.hadoop.hdds.scm.storage.DiskBalancerConfiguration;
+import org.apache.hadoop.hdds.server.events.EventPublisher;
+import org.apache.hadoop.hdfs.DFSConfigKeys;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.stream.Collectors;
+
+import static 
org.apache.hadoop.hdds.protocol.proto.HddsProtos.NodeOperationalState.IN_SERVICE;
+
+/**
+ * Maintains information about the DiskBalancer on SCM side.
+ */
+public class DiskBalancerManager {
+  public static final Logger LOG =
+      LoggerFactory.getLogger(DiskBalancerManager.class);
+
+  private final EventPublisher scmNodeEventPublisher;
+  private final SCMContext scmContext;
+  private final NodeManager nodeManager;
+  private Map<DatanodeDetails, DiskBalancerStatus> statusMap;
+  private boolean useHostnames;
+
+  /**
+   * Constructs DiskBalancer Manager.
+   */
+  public DiskBalancerManager(OzoneConfiguration conf,
+                        EventPublisher eventPublisher,
+                        SCMContext scmContext,
+                        NodeManager nodeManager) {
+    this.scmNodeEventPublisher = eventPublisher;
+    this.scmContext = scmContext;
+    this.nodeManager = nodeManager;
+    this.useHostnames = conf.getBoolean(
+        DFSConfigKeys.DFS_DATANODE_USE_DN_HOSTNAME,
+        DFSConfigKeys.DFS_DATANODE_USE_DN_HOSTNAME_DEFAULT);
+    this.statusMap = new ConcurrentHashMap<>();
+  }
+
+  public List<HddsProtos.DatanodeDiskBalancerInfoProto> getDiskBalancerReport(
+      int count, int clientVersion) throws IOException {
+
+    List<HddsProtos.DatanodeDiskBalancerInfoProto> reportList =
+        new ArrayList<>();
+
+    for (DatanodeDetails datanodeDetails: nodeManager.getNodes(IN_SERVICE,
+        HddsProtos.NodeState.HEALTHY)) {
+      double volumeDensitySum =
+          getVolumeDataDensitySumForDatanodeDetails(datanodeDetails);
+      reportList.add(HddsProtos.DatanodeDiskBalancerInfoProto.newBuilder()
+          .setCurrentVolumeDensitySum(volumeDensitySum)
+          .setNode(datanodeDetails.toProto(clientVersion))
+          .build());
+    }
+
+    reportList.sort((t1, t2) -> Double.compare(t2.getCurrentVolumeDensitySum(),
+        t1.getCurrentVolumeDensitySum()));
+    return reportList.stream().limit(count).collect(Collectors.toList());
+  }
+
+  /**
+   * If hosts is not null, return status of hosts;
+   * If hosts is null, return status of all datanodes in balancing.
+   */
+  public List<HddsProtos.DatanodeDiskBalancerInfoProto> getDiskBalancerStatus(
+      Optional<List<String>> hosts, int clientVersion) throws IOException {
+    List<HddsProtos.DatanodeDiskBalancerInfoProto> statusList =
+        new ArrayList<>();
+    List<DatanodeDetails> filterDns = null;
+    if (hosts.isPresent() && !hosts.get().isEmpty()) {
+      filterDns = NodeUtils.mapHostnamesToDatanodes(nodeManager, hosts.get(),
+          useHostnames);
+    }
+
+    for (DatanodeDetails datanodeDetails: nodeManager.getNodes(IN_SERVICE,
+        HddsProtos.NodeState.HEALTHY)) {
+      if (shouldReturnDatanode(filterDns, datanodeDetails)) {
+        double volumeDensitySum =
+            getVolumeDataDensitySumForDatanodeDetails(datanodeDetails);
+        statusList.add(HddsProtos.DatanodeDiskBalancerInfoProto.newBuilder()
+            .setCurrentVolumeDensitySum(volumeDensitySum)
+            .setDiskBalancerRunning(isRunning(datanodeDetails))
+            .setDiskBalancerConf(statusMap.getOrDefault(datanodeDetails,
+                    DiskBalancerStatus.DUMMY_STATUS)
+                .getDiskBalancerConfiguration().toProtobufBuilder())
+            .setNode(datanodeDetails.toProto(clientVersion))
+            .build());
+      }
+    }
+    return statusList;
+  }
+
+  private boolean shouldReturnDatanode(List<DatanodeDetails> hosts,
+      DatanodeDetails datanodeDetails) {
+    if (hosts == null || hosts.isEmpty()) {
+      return isRunning(datanodeDetails);
+    } else {
+      return hosts.contains(datanodeDetails);
+    }
+  }
+
+  /**
+   * Get volume density for a specific DatanodeDetails node.
+   *
+   * @param datanodeDetails DatanodeDetails
+   * @return DiskBalancer report.
+   */
+  private double getVolumeDataDensitySumForDatanodeDetails(
+      DatanodeDetails datanodeDetails) {
+    Preconditions.checkArgument(datanodeDetails instanceof DatanodeInfo);
+
+    DatanodeInfo datanodeInfo = (DatanodeInfo) datanodeDetails;
+
+    double totalCapacity = 0d, totalUsed = 0d;
+    for (StorageContainerDatanodeProtocolProtos.StorageReportProto reportProto 
:
+        datanodeInfo.getStorageReports()) {
+      totalCapacity += reportProto.getCapacity();
+      totalUsed += reportProto.getScmUsed();
+    }
+
+    Preconditions.checkArgument(totalCapacity != 0);
+    double idealUsage = totalUsed / totalCapacity;
+
+    double volumeDensitySum = datanodeInfo.getStorageReports().stream()
+        .map(report ->
+            Math.abs((double)report.getScmUsed() / report.getCapacity()
+                - idealUsage))
+        .mapToDouble(Double::valueOf).sum();
+
+    return volumeDensitySum;
+  }
+
+  private boolean isRunning(DatanodeDetails datanodeDetails) {
+    return statusMap
+        .getOrDefault(datanodeDetails, DiskBalancerStatus.DUMMY_STATUS)
+        .isRunning();
+  }
+
+  @VisibleForTesting
+  public void addRunningDatanode(DatanodeDetails datanodeDetails) {
+    statusMap.put(datanodeDetails, new DiskBalancerStatus(true,
+        new DiskBalancerConfiguration()));
+  }
+}
diff --git 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/DiskBalancerStatus.java
 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/DiskBalancerStatus.java
new file mode 100644
index 0000000000..a064a6ff9f
--- /dev/null
+++ 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/DiskBalancerStatus.java
@@ -0,0 +1,50 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdds.scm.node;
+
+import org.apache.hadoop.hdds.scm.storage.DiskBalancerConfiguration;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * Maintains DiskBalancerConfiguration and isRunning.
+ */
+public class DiskBalancerStatus {
+  public static final Logger LOG =
+      LoggerFactory.getLogger(DiskBalancerStatus.class);
+
+  private boolean isRunning;
+  private DiskBalancerConfiguration diskBalancerConfiguration;
+
+  public static final DiskBalancerStatus DUMMY_STATUS =
+      new DiskBalancerStatus(false, new DiskBalancerConfiguration());
+
+  public DiskBalancerStatus(boolean isRunning, DiskBalancerConfiguration conf) 
{
+    this.isRunning = isRunning;
+    this.diskBalancerConfiguration = conf;
+  }
+
+  public boolean isRunning() {
+    return isRunning;
+  }
+
+  public DiskBalancerConfiguration getDiskBalancerConfiguration() {
+    return diskBalancerConfiguration;
+  }
+}
diff --git 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/NodeUtils.java
 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/NodeUtils.java
new file mode 100644
index 0000000000..6df74d3746
--- /dev/null
+++ 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/NodeUtils.java
@@ -0,0 +1,150 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hdds.scm.node;
+
+import org.apache.hadoop.hdds.protocol.DatanodeDetails;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.net.InetAddress;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.net.UnknownHostException;
+import java.util.LinkedList;
+import java.util.List;
+
+/**
+ * Util class for Node operations.
+ */
+public final class NodeUtils {
+
+  private static final Logger LOG = LoggerFactory.getLogger(NodeUtils.class);
+
+  private NodeUtils() {
+  }
+
+  public static List<DatanodeDetails> mapHostnamesToDatanodes(
+      NodeManager nodeManager, List<String> hosts, boolean useHostnames)
+      throws InvalidHostStringException {
+    List<DatanodeDetails> results = new LinkedList<>();
+    for (String hostString : hosts) {
+      HostDefinition host = new HostDefinition(hostString);
+      InetAddress addr;
+      try {
+        addr = InetAddress.getByName(host.getHostname());
+      } catch (UnknownHostException e) {
+        throw new InvalidHostStringException("Unable to resolve host "
+            + host.getRawHostname(), e);
+      }
+      String dnsName;
+      if (useHostnames) {
+        dnsName = addr.getHostName();
+      } else {
+        dnsName = addr.getHostAddress();
+      }
+      List<DatanodeDetails> found = nodeManager.getNodesByAddress(dnsName);
+      if (found.size() == 0) {
+        throw new InvalidHostStringException("Host " + host.getRawHostname()
+            + " (" + dnsName + ") is not running any datanodes registered"
+            + " with SCM."
+            + " Please check the host name.");
+      } else if (found.size() == 1) {
+        if (host.getPort() != -1 &&
+            !validateDNPortMatch(host.getPort(), found.get(0))) {
+          throw new InvalidHostStringException("Host " + host.getRawHostname()
+              + " is running a datanode registered with SCM,"
+              + " but the port number doesn't match."
+              + " Please check the port number.");
+        }
+        results.add(found.get(0));
+      } else if (found.size() > 1) {
+        DatanodeDetails match = null;
+        for (DatanodeDetails dn : found) {
+          if (validateDNPortMatch(host.getPort(), dn)) {
+            match = dn;
+            break;
+          }
+        }
+        if (match == null) {
+          throw new InvalidHostStringException("Host " + host.getRawHostname()
+              + " is running multiple datanodes registered with SCM,"
+              + " but no port numbers match."
+              + " Please check the port number.");
+        }
+        results.add(match);
+      }
+    }
+    return results;
+  }
+
+  /**
+   * Check if the passed port is used by the given DatanodeDetails object. If
+   * it is, return true, otherwise return false.
+   * @param port Port number to check if it is used by the datanode
+   * @param dn Datanode to check if it is using the given port
+   * @return True if port is used by the datanode. False otherwise.
+   */
+  private static boolean validateDNPortMatch(int port, DatanodeDetails dn) {
+    for (DatanodeDetails.Port p : dn.getPorts()) {
+      if (p.getValue() == port) {
+        return true;
+      }
+    }
+    return false;
+  }
+  static class HostDefinition {
+    private String rawHostname;
+    private String hostname;
+    private int port;
+
+    HostDefinition(String hostname) throws InvalidHostStringException {
+      this.rawHostname = hostname;
+      parseHostname();
+    }
+
+    public String getRawHostname() {
+      return rawHostname;
+    }
+
+    public String getHostname() {
+      return hostname;
+    }
+
+    public int getPort() {
+      return port;
+    }
+
+    private void parseHostname() throws InvalidHostStringException {
+      try {
+        // A URI *must* have a scheme, so just create a fake one
+        URI uri = new URI("empty://" + rawHostname.trim());
+        this.hostname = uri.getHost();
+        this.port = uri.getPort();
+
+        if (this.hostname == null) {
+          throw new InvalidHostStringException("The string " + rawHostname +
+              " does not contain a value hostname or hostname:port 
definition");
+        }
+      } catch (URISyntaxException e) {
+        throw new InvalidHostStringException(
+            "Unable to parse the hoststring " + rawHostname, e);
+      }
+    }
+  }
+}
diff --git 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/protocol/StorageContainerLocationProtocolServerSideTranslatorPB.java
 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/protocol/StorageContainerLocationProtocolServerSideTranslatorPB.java
index a44536bf44..71a8edad41 100644
--- 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/protocol/StorageContainerLocationProtocolServerSideTranslatorPB.java
+++ 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/protocol/StorageContainerLocationProtocolServerSideTranslatorPB.java
@@ -40,6 +40,7 @@ import 
org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolPro
 import 
org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.ContainerResponseProto;
 import 
org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.DatanodeAdminErrorResponseProto;
 import 
org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.DatanodeUsageInfoResponseProto;
+import 
org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.DatanodeDiskBalancerInfoResponseProto;
 import 
org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.DeactivatePipelineRequestProto;
 import 
org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.DeactivatePipelineResponseProto;
 import 
org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.DecommissionNodesRequestProto;
@@ -722,6 +723,14 @@ public final class 
StorageContainerLocationProtocolServerSideTranslatorPB
             .setStatus(Status.OK)
             .setGetMetricsResponse(getMetrics(request.getGetMetricsRequest()))
             .build();
+      case DatanodeDiskBalancerInfo:
+        return ScmContainerLocationResponse.newBuilder()
+          .setCmdType(request.getCmdType())
+          .setStatus(Status.OK)
+          .setDatanodeDiskBalancerInfoResponse(getDatanodeDiskBalancerInfo(
+              request.getDatanodeDiskBalancerInfoRequest(),
+              request.getVersion()))
+          .build();
       default:
         throw new IllegalArgumentException(
             "Unknown command type: " + request.getCmdType());
@@ -1293,7 +1302,29 @@ public final class 
StorageContainerLocationProtocolServerSideTranslatorPB
   public DecommissionScmResponseProto decommissionScm(
       DecommissionScmRequestProto request) throws IOException {
     return impl.decommissionScm(
-        request.getScmId());
+            request.getScmId());
+  }
+
+  public DatanodeDiskBalancerInfoResponseProto getDatanodeDiskBalancerInfo(
+      StorageContainerLocationProtocolProtos.
+          DatanodeDiskBalancerInfoRequestProto request, int clientVersion)
+      throws IOException {
+    List<HddsProtos.DatanodeDiskBalancerInfoProto> infoProtoList;
+    switch (request.getInfoType()) {
+    case report:
+      infoProtoList = impl.getDiskBalancerReport(request.getCount(),
+          clientVersion);
+      break;
+    case status:
+      infoProtoList = impl.getDiskBalancerStatus(
+          Optional.of(request.getHostsList()), clientVersion);
+      break;
+    default:
+      infoProtoList = null;
+    }
+    return DatanodeDiskBalancerInfoResponseProto.newBuilder()
+        .addAllInfo(infoProtoList)
+        .build();
   }
 
   public GetMetricsResponseProto getMetrics(GetMetricsRequestProto request) 
throws IOException {
diff --git 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMClientProtocolServer.java
 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMClientProtocolServer.java
index 2df2a4847e..6da9907db3 100644
--- 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMClientProtocolServer.java
+++ 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMClientProtocolServer.java
@@ -1274,6 +1274,56 @@ public class SCMClientProtocolServer implements
         ContainerID.valueOf(startContainerID), count, state);
   }
 
+  public List<HddsProtos.DatanodeDiskBalancerInfoProto> getDiskBalancerReport(
+      int count, int clientVersion) throws IOException {
+    // check admin authorisation
+    try {
+      getScm().checkAdminAccess(getRemoteUser(), true);
+    } catch (IOException e) {
+      LOG.error("Authorization failed", e);
+      throw e;
+    }
+
+    return scm.getDiskBalancerManager().getDiskBalancerReport(count,
+        clientVersion);
+  }
+
+  @Override
+  public List<HddsProtos.DatanodeDiskBalancerInfoProto> getDiskBalancerStatus(
+      Optional<List<String>> hosts, int clientVersion) throws IOException {
+    // check admin authorisation
+    try {
+      getScm().checkAdminAccess(getRemoteUser(), true);
+    } catch (IOException e) {
+      LOG.error("Authorization failed", e);
+      throw e;
+    }
+
+    return scm.getDiskBalancerManager().getDiskBalancerStatus(hosts,
+        clientVersion);
+  }
+
+  @Override
+  public void startDiskBalancer(Optional<Double> threshold,
+      Optional<Double> bandwidth, Optional<List<String>> hosts)
+      throws IOException {
+    // TODO: Send message to datanodes
+  }
+
+  @Override
+  public void stopDiskBalancer(Optional<List<String>> hosts)
+      throws IOException {
+    // TODO: Send message to datanodes
+  }
+
+
+  @Override
+  public void updateDiskBalancerConfiguration(Optional<Double> threshold,
+      Optional<Double> bandwidth, Optional<List<String>> hosts)
+      throws IOException {
+    // TODO: Send message to datanodes
+  }
+
   /**
    * Queries a list of Node that match a set of statuses.
    *
diff --git 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/StorageContainerManager.java
 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/StorageContainerManager.java
index 11fdc0d16d..7b14cf2e1b 100644
--- 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/StorageContainerManager.java
+++ 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/StorageContainerManager.java
@@ -70,6 +70,7 @@ import org.apache.hadoop.hdds.scm.ha.SCMRatisServerImpl;
 import org.apache.hadoop.hdds.scm.ha.SCMHAUtils;
 import org.apache.hadoop.hdds.scm.ha.SequenceIdGenerator;
 import org.apache.hadoop.hdds.scm.ScmInfo;
+import org.apache.hadoop.hdds.scm.node.DiskBalancerManager;
 import org.apache.hadoop.hdds.scm.node.NodeAddressUpdateHandler;
 import org.apache.hadoop.hdds.scm.security.SecretKeyManagerService;
 import org.apache.hadoop.hdds.scm.security.RootCARotationManager;
@@ -259,6 +260,7 @@ public final class StorageContainerManager extends 
ServiceRuntimeInfoImpl
   private FinalizationManager finalizationManager;
   private HDDSLayoutVersionManager scmLayoutVersionManager;
   private LeaseManager<Object> leaseManager;
+  private DiskBalancerManager diskBalancerManager;
 
   private SCMMetadataStore scmMetadataStore;
   private CertificateStore certificateStore;
@@ -854,6 +856,8 @@ public final class StorageContainerManager extends 
ServiceRuntimeInfoImpl
         .setSCMDBTransactionBuffer(scmHAManager.getDBTransactionBuffer())
         .setRatisServer(scmHAManager.getRatisServer())
         .build();
+    diskBalancerManager = new DiskBalancerManager(conf, eventQueue, scmContext,
+        scmNodeManager);
   }
 
   /**
@@ -2007,6 +2011,10 @@ public final class StorageContainerManager extends 
ServiceRuntimeInfoImpl
     return serviceManager;
   }
 
+  public DiskBalancerManager getDiskBalancerManager() {
+    return diskBalancerManager;
+  }
+
   /**
    * Force SCM out of safe mode.
    */
diff --git 
a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestDiskBalancerManager.java
 
b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestDiskBalancerManager.java
new file mode 100644
index 0000000000..541c9764b7
--- /dev/null
+++ 
b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestDiskBalancerManager.java
@@ -0,0 +1,99 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdds.scm.node;
+
+import org.apache.hadoop.hdds.HddsConfigKeys;
+import org.apache.hadoop.hdds.conf.OzoneConfiguration;
+import org.apache.hadoop.hdds.protocol.DatanodeDetails;
+import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
+import org.apache.hadoop.hdds.scm.container.MockNodeManager;
+import org.apache.hadoop.hdds.scm.ha.SCMContext;
+import org.apache.hadoop.hdds.server.events.EventQueue;
+import org.apache.hadoop.ozone.ClientVersion;
+import org.apache.ozone.test.GenericTestUtils;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Optional;
+import java.util.UUID;
+import java.util.stream.Collectors;
+
+/**
+ * Unit tests for the DiskBalancer manager.
+ */
+
+public class TestDiskBalancerManager {
+
+  private DiskBalancerManager diskBalancerManager;
+  private NodeManager nodeManager;
+  private OzoneConfiguration conf;
+  private String storageDir;
+
+  @BeforeEach
+  public void setup() throws Exception {
+    conf = new OzoneConfiguration();
+    storageDir = GenericTestUtils.getTempPath(
+        TestDiskBalancerManager.class.getSimpleName() + UUID.randomUUID());
+    conf.set(HddsConfigKeys.OZONE_METADATA_DIRS, storageDir);
+    nodeManager = new MockNodeManager(true, 3);
+    diskBalancerManager = new DiskBalancerManager(conf, new EventQueue(),
+        SCMContext.emptyContext(), nodeManager);
+  }
+
+  @Test
+  public void testDatanodeDiskBalancerReport() throws IOException {
+    List<HddsProtos.DatanodeDiskBalancerInfoProto> reportProtoList =
+        diskBalancerManager.getDiskBalancerReport(2,
+            ClientVersion.CURRENT_VERSION);
+
+    Assertions.assertEquals(2, reportProtoList.size());
+    Assertions.assertTrue(
+        reportProtoList.get(0).getCurrentVolumeDensitySum()
+            >= reportProtoList.get(1).getCurrentVolumeDensitySum());
+  }
+
+  @Test
+  public void testDatanodeDiskBalancerStatus() throws IOException {
+    diskBalancerManager.addRunningDatanode(nodeManager.getAllNodes().get(0));
+
+    // Simulate users asking all status of 3 datanodes
+    List<String> dns = nodeManager.getAllNodes().stream().map(
+        DatanodeDetails::getIpAddress).collect(
+        Collectors.toList());
+
+    List<HddsProtos.DatanodeDiskBalancerInfoProto> statusProtoList =
+        diskBalancerManager.getDiskBalancerStatus(Optional.of(dns),
+            ClientVersion.CURRENT_VERSION);
+
+    Assertions.assertEquals(3, statusProtoList.size());
+
+    // Simulate users asking status of 1 datanodes
+    dns = nodeManager.getAllNodes().stream().map(
+        DatanodeDetails::getIpAddress).limit(1).collect(
+        Collectors.toList());
+
+    statusProtoList =
+        diskBalancerManager.getDiskBalancerStatus(Optional.of(dns),
+            ClientVersion.CURRENT_VERSION);
+
+    Assertions.assertEquals(1, statusProtoList.size());
+  }
+}
diff --git 
a/hadoop-hdds/tools/src/main/java/org/apache/hadoop/hdds/scm/cli/ContainerOperationClient.java
 
b/hadoop-hdds/tools/src/main/java/org/apache/hadoop/hdds/scm/cli/ContainerOperationClient.java
index 499d58b1ff..cb3fdb42f8 100644
--- 
a/hadoop-hdds/tools/src/main/java/org/apache/hadoop/hdds/scm/cli/ContainerOperationClient.java
+++ 
b/hadoop-hdds/tools/src/main/java/org/apache/hadoop/hdds/scm/cli/ContainerOperationClient.java
@@ -568,4 +568,38 @@ public class ContainerOperationClient implements ScmClient 
{
     return storageContainerLocationClient.getMetrics(query);
   }
 
+  public List<HddsProtos.DatanodeDiskBalancerInfoProto> getDiskBalancerReport(
+      int count) throws IOException {
+    return storageContainerLocationClient.getDiskBalancerReport(count,
+        ClientVersion.CURRENT_VERSION);
+  }
+
+  @Override
+  public void startDiskBalancer(Optional<Double> threshold,
+      Optional<Double> bandwidth, Optional<List<String>> hosts)
+      throws IOException {
+    storageContainerLocationClient.startDiskBalancer(threshold, bandwidth,
+        hosts);
+  }
+
+  @Override
+  public void stopDiskBalancer(Optional<List<String>> hosts)
+      throws IOException {
+    storageContainerLocationClient.stopDiskBalancer(hosts);
+  }
+
+  @Override
+  public List<HddsProtos.DatanodeDiskBalancerInfoProto> getDiskBalancerStatus(
+      Optional<List<String>> hosts) throws IOException {
+    return storageContainerLocationClient.getDiskBalancerStatus(hosts,
+        ClientVersion.CURRENT_VERSION);
+  }
+
+  @Override
+  public void updateDiskBalancerConfiguration(Optional<Double> threshold,
+      Optional<Double> bandwidth, Optional<List<String>> hosts)
+      throws IOException {
+    storageContainerLocationClient.updateDiskBalancerConfiguration(threshold,
+        bandwidth, hosts);
+  }
 }
diff --git 
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/scm/node/TestDiskBalancer.java
 
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/scm/node/TestDiskBalancer.java
new file mode 100644
index 0000000000..c047cd3ee0
--- /dev/null
+++ 
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/scm/node/TestDiskBalancer.java
@@ -0,0 +1,90 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.ozone.scm.node;
+
+import org.apache.hadoop.hdds.conf.OzoneConfiguration;
+import org.apache.hadoop.hdds.protocol.DatanodeDetails;
+import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
+import org.apache.hadoop.hdds.scm.HddsTestUtils;
+import org.apache.hadoop.hdds.scm.PlacementPolicy;
+import org.apache.hadoop.hdds.scm.ScmConfigKeys;
+import org.apache.hadoop.hdds.scm.cli.ContainerOperationClient;
+import org.apache.hadoop.hdds.scm.client.ScmClient;
+import 
org.apache.hadoop.hdds.scm.container.placement.algorithms.SCMContainerPlacementCapacity;
+import org.apache.hadoop.hdds.scm.node.DatanodeInfo;
+import org.apache.hadoop.ozone.MiniOzoneCluster;
+import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.Timeout;
+
+import java.io.IOException;
+import java.util.List;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+/**
+ * This class tests disk balancer operations.
+ */
+@Timeout(300)
+public class TestDiskBalancer {
+
+  private static ScmClient storageClient;
+  private static MiniOzoneCluster cluster;
+  private static OzoneConfiguration ozoneConf;
+
+  @BeforeAll
+  public static void setup() throws Exception {
+    ozoneConf = new OzoneConfiguration();
+    ozoneConf.setClass(ScmConfigKeys.OZONE_SCM_CONTAINER_PLACEMENT_IMPL_KEY,
+        SCMContainerPlacementCapacity.class, PlacementPolicy.class);
+    cluster = 
MiniOzoneCluster.newBuilder(ozoneConf).setNumDatanodes(3).build();
+    storageClient = new ContainerOperationClient(ozoneConf);
+    cluster.waitForClusterToBeReady();
+
+    for (DatanodeDetails dn: cluster.getStorageContainerManager()
+        .getScmNodeManager().getAllNodes()) {
+      ((DatanodeInfo) dn).updateStorageReports(
+          HddsTestUtils.getRandomNodeReport(3, 1).getStorageReportList());
+    }
+  }
+
+  @AfterAll
+  public static void cleanup() throws Exception {
+    if (cluster != null) {
+      cluster.shutdown();
+    }
+  }
+
+  @Test
+  public void testDatanodeDiskBalancerReport() throws IOException {
+    List<HddsProtos.DatanodeDiskBalancerInfoProto> reportProtoList =
+        storageClient.getDiskBalancerReport(2);
+
+    assertEquals(2, reportProtoList.size());
+    assertTrue(
+        reportProtoList.get(0).getCurrentVolumeDensitySum()
+        >= reportProtoList.get(1).getCurrentVolumeDensitySum());
+  }
+
+  @Test
+  public void testDatanodeDiskBalancerStatus() throws IOException {
+    // TODO: Test status command with datanodes in balancing
+  }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to