This is an automated email from the ASF dual-hosted git repository.

yiyang0203 pushed a commit to branch HDDS-5713
in repository https://gitbox.apache.org/repos/asf/ozone.git

commit 9094b0e839820d7ba8a1d672193596326719c9d9
Author: Symious <[email protected]>
AuthorDate: Tue Aug 16 10:40:22 2022 +0800

    HDDS-7106. [DiskBalancer] Client-SCM interface (#3663)
---
 .../apache/hadoop/hdds/scm/client/ScmClient.java   |  41 +++++
 .../protocol/StorageContainerLocationProtocol.java |  30 ++++
 .../scm/storage/DiskBalancerConfiguration.java     | 165 +++++++++++++++++++
 .../org/apache/hadoop/hdds/conf/ConfigTag.java     |   3 +-
 ...inerLocationProtocolClientSideTranslatorPB.java |  55 +++++++
 .../src/main/proto/ScmAdminProtocol.proto          |  21 +++
 .../interface-client/src/main/proto/hdds.proto     |  15 +-
 .../hadoop/hdds/scm/node/DiskBalancerManager.java  | 176 +++++++++++++++++++++
 .../hadoop/hdds/scm/node/DiskBalancerStatus.java   |  50 ++++++
 .../hdds/scm/node/NodeDecommissionManager.java     | 123 +-------------
 .../org/apache/hadoop/hdds/scm/node/NodeUtils.java | 150 ++++++++++++++++++
 ...inerLocationProtocolServerSideTranslatorPB.java |  31 ++++
 .../hdds/scm/server/SCMClientProtocolServer.java   |  50 ++++++
 .../hdds/scm/server/StorageContainerManager.java   |   8 +
 .../hdds/scm/node/TestDiskBalancerManager.java     |  99 ++++++++++++
 .../hdds/scm/node/TestNodeDecommissionManager.java |  13 +-
 .../hdds/scm/cli/ContainerOperationClient.java     |  36 +++++
 .../hadoop/ozone/scm/node/TestDiskBalancer.java    |  96 +++++++++++
 18 files changed, 1036 insertions(+), 126 deletions(-)

diff --git 
a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/client/ScmClient.java
 
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/client/ScmClient.java
index 916bd7fdfb..d4f6267c49 100644
--- 
a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/client/ScmClient.java
+++ 
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/client/ScmClient.java
@@ -420,4 +420,45 @@ public interface ScmClient extends Closeable {
   StatusAndMessages queryUpgradeFinalizationProgress(
       String upgradeClientID, boolean force, boolean readonly)
       throws IOException;
+
+  /**
+   * Get DiskBalancer status.
+   * @param count top datanodes that need balancing
+   * @return List of DatanodeDiskBalancerInfo.
+   * @throws IOException
+   */
+  List<HddsProtos.DatanodeDiskBalancerInfoProto> getDiskBalancerReport(
+      int count) throws IOException;
+
+  /**
+   * Get DiskBalancer status.
+   * @param hosts If hosts is not null, return status of hosts; If hosts is
+   *              null, return status of all datanodes in balancing.
+   * @return List of DatanodeDiskBalancerInfo.
+   * @throws IOException
+   */
+  List<HddsProtos.DatanodeDiskBalancerInfoProto> getDiskBalancerStatus(
+      Optional<List<String>> hosts) throws IOException;
+
+  /**
+   * Start DiskBalancer.
+   */
+  void startDiskBalancer(
+      Optional<Double> threshold,
+      Optional<Double> bandwidth,
+      Optional<List<String>> hosts) throws IOException;
+
+  /**
+   * Stop DiskBalancer.
+   */
+  void stopDiskBalancer(Optional<List<String>> hosts) throws IOException;
+
+
+  /**
+   * Update DiskBalancer Configuration.
+   */
+  void updateDiskBalancerConfiguration(
+      Optional<Double> threshold,
+      Optional<Double> bandwidth,
+      Optional<List<String>> hosts) throws IOException;
 }
diff --git 
a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/protocol/StorageContainerLocationProtocol.java
 
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/protocol/StorageContainerLocationProtocol.java
index 7690b2eefb..d3e02a5104 100644
--- 
a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/protocol/StorageContainerLocationProtocol.java
+++ 
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/protocol/StorageContainerLocationProtocol.java
@@ -455,4 +455,34 @@ public interface StorageContainerLocationProtocol extends 
Closeable {
   List<ContainerInfo> getListOfContainers(
       long startContainerID, int count, HddsProtos.LifeCycleState state)
       throws IOException;
+
+  List<HddsProtos.DatanodeDiskBalancerInfoProto> getDiskBalancerReport(
+      int count, int clientVersion) throws IOException;
+
+  /**
+   * Get DiskBalancer status.
+   */
+  List<HddsProtos.DatanodeDiskBalancerInfoProto> getDiskBalancerStatus(
+      Optional<List<String>> hosts, int clientVersion) throws IOException;
+
+  /**
+   * Start DiskBalancer.
+   */
+  void startDiskBalancer(
+      Optional<Double> threshold,
+      Optional<Double> bandwidth,
+      Optional<List<String>> hosts) throws IOException;
+
+  /**
+   * Stop DiskBalancer.
+   */
+  void stopDiskBalancer(Optional<List<String>> hosts) throws IOException;
+
+  /**
+   * Update DiskBalancer Configuration.
+   */
+  void updateDiskBalancerConfiguration(
+      Optional<Double> threshold,
+      Optional<Double> bandwidth,
+      Optional<List<String>> hosts) throws IOException;
 }
diff --git 
a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/storage/DiskBalancerConfiguration.java
 
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/storage/DiskBalancerConfiguration.java
new file mode 100644
index 0000000000..704b383679
--- /dev/null
+++ 
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/storage/DiskBalancerConfiguration.java
@@ -0,0 +1,165 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hdds.scm.storage;
+
+import org.apache.hadoop.hdds.conf.Config;
+import org.apache.hadoop.hdds.conf.ConfigGroup;
+import org.apache.hadoop.hdds.conf.ConfigTag;
+import org.apache.hadoop.hdds.conf.ConfigType;
+import org.apache.hadoop.hdds.conf.OzoneConfiguration;
+import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
+import org.jetbrains.annotations.NotNull;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * This class contains configuration values for the DiskBalancer.
+ */
+@ConfigGroup(prefix = "hdds.datanode.disk.balancer")
+public final class DiskBalancerConfiguration {
+  private static final Logger LOG =
+      LoggerFactory.getLogger(DiskBalancerConfiguration.class);
+
+  @Config(key = "volume.density.threshold", type = ConfigType.AUTO,
+      defaultValue = "10", tags = {ConfigTag.DISKBALANCER},
+      description = "Threshold is a percentage in the range of 0 to 100. A " +
+          "datanode is considered balanced if for each volume, the " +
+          "utilization of the volume(used space to capacity ratio) differs" +
+          " from the utilization of the datanode(used space to capacity ratio" 
+
+          " of the entire datanode) no more than the threshold.")
+  private double threshold = 10d;
+
+  @Config(key = "max.disk.throughputInMBPerSec", type = ConfigType.AUTO,
+      defaultValue = "10", tags = {ConfigTag.DISKBALANCER},
+      description = "The max balance speed.")
+  private double diskBandwidth = 10;
+
+  @Config(key = "parallel.thread", type = ConfigType.AUTO,
+      defaultValue = "5", tags = {ConfigTag.DISKBALANCER},
+      description = "The max parallel balance thread count.")
+  private int parallelThread = 5;
+
+  /**
+   * Gets the threshold value for DiskBalancer.
+   *
+   * @return percentage value in the range 0 to 100
+   */
+  public double getThreshold() {
+    return threshold;
+  }
+
+  public double getThresholdAsRatio() {
+    return threshold / 100;
+  }
+
+  /**
+   * Sets the threshold value for Disk Balancer.
+   *
+   * @param threshold a percentage value in the range 0 to 100
+   */
+  public void setThreshold(double threshold) {
+    if (threshold < 0d || threshold >= 100d) {
+      throw new IllegalArgumentException(
+          "Threshold must be a percentage(double) in the range 0 to 100.");
+    }
+    this.threshold = threshold;
+  }
+
+  /**
+   * Gets the disk bandwidth value for Disk Balancer.
+   *
+   * @return max disk bandwidth per second
+   */
+  public double getDiskBandwidth() {
+    return diskBandwidth;
+  }
+
+  /**
+   * Sets the disk bandwidth value for Disk Balancer.
+   *
+   * @param diskBandwidth the bandwidth to control balance speed
+   */
+  public void setDiskBandwidth(double diskBandwidth) {
+    if (diskBandwidth <= 0d) {
+      throw new IllegalArgumentException(
+          "diskBandwidth must be a value larger than 0.");
+    }
+    this.diskBandwidth = diskBandwidth;
+  }
+
+  /**
+   * Gets the parallel thread for Disk Balancer.
+   *
+   * @return parallel thread
+   */
+  public int getParallelThread() {
+    return parallelThread;
+  }
+
+  /**
+   * Sets the parallel thread for Disk Balancer.
+   *
+   * @param parallelThread the parallel thread count
+   */
+  public void setParallelThread(int parallelThread) {
+    if (parallelThread <= 0) {
+      throw new IllegalArgumentException(
+          "parallelThread must be a value larger than 0.");
+    }
+    this.parallelThread = parallelThread;
+  }
+  @Override
+  public String toString() {
+    return String.format("Disk Balancer Configuration values:%n" +
+            "%-50s %s%n" +
+            "%-50s %s%n" +
+            "%-50s %s%n" +
+            "%-50s %s%n",
+            "Key", "Value",
+        "Threshold", threshold, "Max disk bandwidth", diskBandwidth,
+        "Parallel Thread", parallelThread);
+  }
+
+  public HddsProtos.DiskBalancerConfigurationProto.Builder toProtobufBuilder() 
{
+    HddsProtos.DiskBalancerConfigurationProto.Builder builder =
+        HddsProtos.DiskBalancerConfigurationProto.newBuilder();
+
+    builder.setThreshold(threshold)
+        .setDiskBandwidth(diskBandwidth)
+        .setParallelThread(parallelThread);
+    return builder;
+  }
+
+  static DiskBalancerConfiguration fromProtobuf(
+      @NotNull HddsProtos.DiskBalancerConfigurationProto proto,
+      @NotNull OzoneConfiguration ozoneConfiguration) {
+    DiskBalancerConfiguration config =
+        ozoneConfiguration.getObject(DiskBalancerConfiguration.class);
+    if (proto.hasThreshold()) {
+      config.setThreshold(proto.getThreshold());
+    }
+    if (proto.hasDiskBandwidth()) {
+      config.setDiskBandwidth(proto.getDiskBandwidth());
+    }
+    if (proto.hasParallelThread()) {
+      config.setParallelThread(proto.getParallelThread());
+    }
+    return config;
+  }
+}
diff --git 
a/hadoop-hdds/config/src/main/java/org/apache/hadoop/hdds/conf/ConfigTag.java 
b/hadoop-hdds/config/src/main/java/org/apache/hadoop/hdds/conf/ConfigTag.java
index 24feb69389..d4817ea592 100644
--- 
a/hadoop-hdds/config/src/main/java/org/apache/hadoop/hdds/conf/ConfigTag.java
+++ 
b/hadoop-hdds/config/src/main/java/org/apache/hadoop/hdds/conf/ConfigTag.java
@@ -52,5 +52,6 @@ public enum ConfigTag {
   TLS,
   TOKEN,
   UPGRADE,
-  X509
+  X509,
+  DISKBALANCER
 }
diff --git 
a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/scm/protocolPB/StorageContainerLocationProtocolClientSideTranslatorPB.java
 
b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/scm/protocolPB/StorageContainerLocationProtocolClientSideTranslatorPB.java
index c664a42ae6..dfb9873adc 100644
--- 
a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/scm/protocolPB/StorageContainerLocationProtocolClientSideTranslatorPB.java
+++ 
b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/scm/protocolPB/StorageContainerLocationProtocolClientSideTranslatorPB.java
@@ -46,6 +46,9 @@ import 
org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolPro
 import 
org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.DatanodeAdminErrorResponseProto;
 import 
org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.DatanodeUsageInfoRequestProto;
 import 
org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.DatanodeUsageInfoResponseProto;
+import 
org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.DatanodeDiskBalancerInfoType;
+import 
org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.DatanodeDiskBalancerInfoRequestProto;
+import 
org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.DatanodeDiskBalancerInfoResponseProto;
 import 
org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.DeactivatePipelineRequestProto;
 import 
org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.DecommissionNodesRequestProto;
 import 
org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.DecommissionNodesResponseProto;
@@ -1060,6 +1063,58 @@ public final class 
StorageContainerLocationProtocolClientSideTranslatorPB
     return response.getContainerCount();
   }
 
+  public List<HddsProtos.DatanodeDiskBalancerInfoProto> getDiskBalancerReport(
+      int count, int clientVersion) throws IOException {
+    DatanodeDiskBalancerInfoRequestProto request =
+        DatanodeDiskBalancerInfoRequestProto.newBuilder()
+            .setInfoType(DatanodeDiskBalancerInfoType.report)
+            .setCount(count)
+            .build();
+
+    DatanodeDiskBalancerInfoResponseProto response =
+        submitRequest(Type.DatanodeDiskBalancerInfo,
+            builder -> builder.setDatanodeDiskBalancerInfoRequest(request))
+            .getDatanodeDiskBalancerInfoResponse();
+
+    return response.getInfoList();
+  }
+
+  @Override
+  public List<HddsProtos.DatanodeDiskBalancerInfoProto> getDiskBalancerStatus(
+      Optional<List<String>> hosts, int clientVersion) throws IOException {
+    DatanodeDiskBalancerInfoRequestProto.Builder requestBuilder =
+        DatanodeDiskBalancerInfoRequestProto.newBuilder()
+            .setInfoType(DatanodeDiskBalancerInfoType.status);
+    hosts.ifPresent(requestBuilder::addAllHosts);
+    DatanodeDiskBalancerInfoRequestProto request = requestBuilder.build();
+
+    DatanodeDiskBalancerInfoResponseProto response =
+        submitRequest(Type.DatanodeDiskBalancerInfo,
+            builder -> builder.setDatanodeDiskBalancerInfoRequest(request))
+            .getDatanodeDiskBalancerInfoResponse();
+
+    return response.getInfoList();
+  }
+
+  @Override
+  public void startDiskBalancer(Optional<Double> threshold,
+      Optional<Double> bandwidth, Optional<List<String>> hosts)
+      throws IOException {
+
+  }
+
+  @Override
+  public void stopDiskBalancer(Optional<List<String>> hosts)
+      throws IOException {
+
+  }
+
+  @Override
+  public void updateDiskBalancerConfiguration(Optional<Double> threshold,
+      Optional<Double> bandwidth, Optional<List<String>> hosts)
+      throws IOException {
+  }
+
   @Override
   public Object getUnderlyingProxyObject() {
     return rpcProxy;
diff --git a/hadoop-hdds/interface-admin/src/main/proto/ScmAdminProtocol.proto 
b/hadoop-hdds/interface-admin/src/main/proto/ScmAdminProtocol.proto
index d5a3c6f65a..26fdd47ca4 100644
--- a/hadoop-hdds/interface-admin/src/main/proto/ScmAdminProtocol.proto
+++ b/hadoop-hdds/interface-admin/src/main/proto/ScmAdminProtocol.proto
@@ -80,6 +80,7 @@ message ScmContainerLocationRequest {
   optional ResetDeletedBlockRetryCountRequestProto 
resetDeletedBlockRetryCountRequest = 41;
   optional TransferLeadershipRequestProto  transferScmLeadershipRequest = 42;
   optional GetFailedDeletedBlocksTxnRequestProto 
getFailedDeletedBlocksTxnRequest = 43;
+  optional DatanodeDiskBalancerInfoRequestProto 
DatanodeDiskBalancerInfoRequest = 44;
 }
 
 message ScmContainerLocationResponse {
@@ -131,6 +132,7 @@ message ScmContainerLocationResponse {
   optional ResetDeletedBlockRetryCountResponseProto 
resetDeletedBlockRetryCountResponse = 41;
   optional TransferLeadershipResponseProto  transferScmLeadershipResponse = 42;
   optional GetFailedDeletedBlocksTxnResponseProto 
getFailedDeletedBlocksTxnResponse = 43;
+  optional DatanodeDiskBalancerInfoResponseProto 
DatanodeDiskBalancerInfoResponse = 44;
 
   enum Status {
     OK = 1;
@@ -181,6 +183,7 @@ enum Type {
   GetClosedContainerCount = 37;
   TransferLeadership = 38;
   GetFailedDeletedBlocksTransaction = 39;
+  DatanodeDiskBalancerInfo = 40;
 }
 
 /**
@@ -332,6 +335,24 @@ message DatanodeUsageInfoResponseProto {
   repeated DatanodeUsageInfoProto info = 1;
 }
 
+enum DatanodeDiskBalancerInfoType{
+  report = 1;
+  status = 2;
+}
+
+/*
+  Datanode disk balancer status request message.
+*/
+message DatanodeDiskBalancerInfoRequestProto {
+  required DatanodeDiskBalancerInfoType infoType = 1;
+  optional uint32 count = 2;
+  repeated string hosts = 3;
+}
+
+message DatanodeDiskBalancerInfoResponseProto {
+  repeated DatanodeDiskBalancerInfoProto info = 1;
+}
+
 /*
   Decommission a list of hosts
 */
diff --git a/hadoop-hdds/interface-client/src/main/proto/hdds.proto 
b/hadoop-hdds/interface-client/src/main/proto/hdds.proto
index 16ea4887aa..f251449223 100644
--- a/hadoop-hdds/interface-client/src/main/proto/hdds.proto
+++ b/hadoop-hdds/interface-client/src/main/proto/hdds.proto
@@ -465,4 +465,17 @@ message DeletedBlocksTransactionInfo {
     optional int64 containerID = 2;
     repeated int64 localID = 3;
     optional int32 count = 4;
-}
\ No newline at end of file
+}
+
+message DiskBalancerConfigurationProto {
+    optional double threshold = 1;
+    optional double diskBandwidth = 2;
+    optional int32 parallelThread = 3;
+}
+
+message DatanodeDiskBalancerInfoProto {
+    required DatanodeDetailsProto node = 1;
+    required double currentVolumeDensitySum = 2;
+    optional bool diskBalancerRunning = 3;
+    optional DiskBalancerConfigurationProto diskBalancerConf = 4;
+}
diff --git 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/DiskBalancerManager.java
 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/DiskBalancerManager.java
new file mode 100644
index 0000000000..5ca66e9eef
--- /dev/null
+++ 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/DiskBalancerManager.java
@@ -0,0 +1,176 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdds.scm.node;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+import org.apache.hadoop.hdds.conf.OzoneConfiguration;
+import org.apache.hadoop.hdds.protocol.DatanodeDetails;
+import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
+import 
org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos;
+import org.apache.hadoop.hdds.scm.ha.SCMContext;
+import org.apache.hadoop.hdds.scm.storage.DiskBalancerConfiguration;
+import org.apache.hadoop.hdds.server.events.EventPublisher;
+import org.apache.hadoop.hdfs.DFSConfigKeys;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.stream.Collectors;
+
+import static 
org.apache.hadoop.hdds.protocol.proto.HddsProtos.NodeOperationalState.IN_SERVICE;
+
+/**
+ * Maintains information about the DiskBalancer on SCM side.
+ */
+public class DiskBalancerManager {
+  public static final Logger LOG =
+      LoggerFactory.getLogger(DiskBalancerManager.class);
+
+  private final EventPublisher scmNodeEventPublisher;
+  private final SCMContext scmContext;
+  private final NodeManager nodeManager;
+  private Map<DatanodeDetails, DiskBalancerStatus> statusMap;
+  private boolean useHostnames;
+
+  /**
+   * Constructs DiskBalancer Manager.
+   */
+  public DiskBalancerManager(OzoneConfiguration conf,
+                        EventPublisher eventPublisher,
+                        SCMContext scmContext,
+                        NodeManager nodeManager) {
+    this.scmNodeEventPublisher = eventPublisher;
+    this.scmContext = scmContext;
+    this.nodeManager = nodeManager;
+    this.useHostnames = conf.getBoolean(
+        DFSConfigKeys.DFS_DATANODE_USE_DN_HOSTNAME,
+        DFSConfigKeys.DFS_DATANODE_USE_DN_HOSTNAME_DEFAULT);
+    this.statusMap = new ConcurrentHashMap<>();
+  }
+
+  public List<HddsProtos.DatanodeDiskBalancerInfoProto> getDiskBalancerReport(
+      int count, int clientVersion) throws IOException {
+
+    List<HddsProtos.DatanodeDiskBalancerInfoProto> reportList =
+        new ArrayList<>();
+
+    for (DatanodeDetails datanodeDetails: nodeManager.getNodes(IN_SERVICE,
+        HddsProtos.NodeState.HEALTHY)) {
+      double volumeDensitySum =
+          getVolumeDataDensitySumForDatanodeDetails(datanodeDetails);
+      reportList.add(HddsProtos.DatanodeDiskBalancerInfoProto.newBuilder()
+          .setCurrentVolumeDensitySum(volumeDensitySum)
+          .setNode(datanodeDetails.toProto(clientVersion))
+          .build());
+    }
+
+    reportList.sort((t1, t2) -> Double.compare(t2.getCurrentVolumeDensitySum(),
+        t1.getCurrentVolumeDensitySum()));
+    return reportList.stream().limit(count).collect(Collectors.toList());
+  }
+
+  /**
+   * If hosts is not null, return status of hosts;
+   * If hosts is null, return status of all datanodes in balancing.
+   */
+  public List<HddsProtos.DatanodeDiskBalancerInfoProto> getDiskBalancerStatus(
+      Optional<List<String>> hosts, int clientVersion) throws IOException {
+    List<HddsProtos.DatanodeDiskBalancerInfoProto> statusList =
+        new ArrayList<>();
+    List<DatanodeDetails> filterDns = null;
+    if (hosts.isPresent() && !hosts.get().isEmpty()) {
+      filterDns = NodeUtils.mapHostnamesToDatanodes(nodeManager, hosts.get(),
+          useHostnames);
+    }
+
+    for (DatanodeDetails datanodeDetails: nodeManager.getNodes(IN_SERVICE,
+        HddsProtos.NodeState.HEALTHY)) {
+      if (shouldReturnDatanode(filterDns, datanodeDetails)) {
+        double volumeDensitySum =
+            getVolumeDataDensitySumForDatanodeDetails(datanodeDetails);
+        statusList.add(HddsProtos.DatanodeDiskBalancerInfoProto.newBuilder()
+            .setCurrentVolumeDensitySum(volumeDensitySum)
+            .setDiskBalancerRunning(isRunning(datanodeDetails))
+            .setDiskBalancerConf(statusMap.getOrDefault(datanodeDetails,
+                    DiskBalancerStatus.DUMMY_STATUS)
+                .getDiskBalancerConfiguration().toProtobufBuilder())
+            .setNode(datanodeDetails.toProto(clientVersion))
+            .build());
+      }
+    }
+    return statusList;
+  }
+
+  private boolean shouldReturnDatanode(List<DatanodeDetails> hosts,
+      DatanodeDetails datanodeDetails) {
+    if (hosts == null || hosts.isEmpty()) {
+      return isRunning(datanodeDetails);
+    } else {
+      return hosts.contains(datanodeDetails);
+    }
+  }
+
+  /**
+   * Get volume density for a specific DatanodeDetails node.
+   *
+   * @param datanodeDetails DatanodeDetails
+   * @return DiskBalancer report.
+   */
+  private double getVolumeDataDensitySumForDatanodeDetails(
+      DatanodeDetails datanodeDetails) {
+    Preconditions.checkArgument(datanodeDetails instanceof DatanodeInfo);
+
+    DatanodeInfo datanodeInfo = (DatanodeInfo) datanodeDetails;
+
+    double totalCapacity = 0d, totalUsed = 0d;
+    for (StorageContainerDatanodeProtocolProtos.StorageReportProto reportProto 
:
+        datanodeInfo.getStorageReports()) {
+      totalCapacity += reportProto.getCapacity();
+      totalUsed += reportProto.getScmUsed();
+    }
+
+    Preconditions.checkArgument(totalCapacity != 0);
+    double idealUsage = totalUsed / totalCapacity;
+
+    double volumeDensitySum = datanodeInfo.getStorageReports().stream()
+        .map(report ->
+            Math.abs((double)report.getScmUsed() / report.getCapacity()
+                - idealUsage))
+        .mapToDouble(Double::valueOf).sum();
+
+    return volumeDensitySum;
+  }
+
+  private boolean isRunning(DatanodeDetails datanodeDetails) {
+    return statusMap
+        .getOrDefault(datanodeDetails, DiskBalancerStatus.DUMMY_STATUS)
+        .isRunning();
+  }
+
+  @VisibleForTesting
+  public void addRunningDatanode(DatanodeDetails datanodeDetails) {
+    statusMap.put(datanodeDetails, new DiskBalancerStatus(true,
+        new DiskBalancerConfiguration()));
+  }
+}
\ No newline at end of file
diff --git 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/DiskBalancerStatus.java
 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/DiskBalancerStatus.java
new file mode 100644
index 0000000000..ed22e80e3c
--- /dev/null
+++ 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/DiskBalancerStatus.java
@@ -0,0 +1,50 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdds.scm.node;
+
+import org.apache.hadoop.hdds.scm.storage.DiskBalancerConfiguration;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * Maintains DiskBalancerConfiguration and isRunning.
+ */
+public class DiskBalancerStatus {
+  public static final Logger LOG =
+      LoggerFactory.getLogger(DiskBalancerStatus.class);
+
+  private boolean isRunning;
+  private DiskBalancerConfiguration diskBalancerConfiguration;
+
+  public static final DiskBalancerStatus DUMMY_STATUS =
+      new DiskBalancerStatus(false, new DiskBalancerConfiguration());
+
+  public DiskBalancerStatus(boolean isRunning, DiskBalancerConfiguration conf) 
{
+    this.isRunning = isRunning;
+    this.diskBalancerConfiguration = conf;
+  }
+
+  public boolean isRunning() {
+    return isRunning;
+  }
+
+  public DiskBalancerConfiguration getDiskBalancerConfiguration() {
+    return diskBalancerConfiguration;
+  }
+}
\ No newline at end of file
diff --git 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/NodeDecommissionManager.java
 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/NodeDecommissionManager.java
index a84b07d513..80f81782ea 100644
--- 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/NodeDecommissionManager.java
+++ 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/NodeDecommissionManager.java
@@ -32,12 +32,7 @@ import org.apache.hadoop.hdfs.DFSConfigKeys;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.net.InetAddress;
-import java.net.URI;
-import java.net.URISyntaxException;
-import java.net.UnknownHostException;
 import java.util.ArrayList;
-import java.util.LinkedList;
 import java.util.List;
 import java.util.concurrent.Executors;
 import java.util.concurrent.ScheduledExecutorService;
@@ -66,115 +61,6 @@ public class NodeDecommissionManager {
   private static final Logger LOG =
       LoggerFactory.getLogger(NodeDecommissionManager.class);
 
-  static class HostDefinition {
-    private String rawHostname;
-    private String hostname;
-    private int port;
-
-    HostDefinition(String hostname) throws InvalidHostStringException {
-      this.rawHostname = hostname;
-      parseHostname();
-    }
-
-    public String getRawHostname() {
-      return rawHostname;
-    }
-
-    public String getHostname() {
-      return hostname;
-    }
-
-    public int getPort() {
-      return port;
-    }
-
-    private void parseHostname() throws InvalidHostStringException {
-      try {
-        // A URI *must* have a scheme, so just create a fake one
-        URI uri = new URI("empty://" + rawHostname.trim());
-        this.hostname = uri.getHost();
-        this.port = uri.getPort();
-
-        if (this.hostname == null) {
-          throw new InvalidHostStringException("The string " + rawHostname +
-              " does not contain a value hostname or hostname:port 
definition");
-        }
-      } catch (URISyntaxException e) {
-        throw new InvalidHostStringException(
-            "Unable to parse the hoststring " + rawHostname, e);
-      }
-    }
-  }
-
-  private List<DatanodeDetails> mapHostnamesToDatanodes(List<String> hosts)
-      throws InvalidHostStringException {
-    List<DatanodeDetails> results = new LinkedList<>();
-    for (String hostString : hosts) {
-      HostDefinition host = new HostDefinition(hostString);
-      InetAddress addr;
-      try {
-        addr = InetAddress.getByName(host.getHostname());
-      } catch (UnknownHostException e) {
-        throw new InvalidHostStringException("Unable to resolve host "
-            + host.getRawHostname(), e);
-      }
-      String dnsName;
-      if (useHostnames) {
-        dnsName = addr.getHostName();
-      } else {
-        dnsName = addr.getHostAddress();
-      }
-      List<DatanodeDetails> found = nodeManager.getNodesByAddress(dnsName);
-      if (found.size() == 0) {
-        throw new InvalidHostStringException("Host " + host.getRawHostname()
-            + " (" + dnsName + ") is not running any datanodes registered"
-            + " with SCM."
-            + " Please check the host name.");
-      } else if (found.size() == 1) {
-        if (host.getPort() != -1 &&
-            !validateDNPortMatch(host.getPort(), found.get(0))) {
-          throw new InvalidHostStringException("Host " + host.getRawHostname()
-              + " is running a datanode registered with SCM,"
-              + " but the port number doesn't match."
-              + " Please check the port number.");
-        }
-        results.add(found.get(0));
-      } else if (found.size() > 1) {
-        DatanodeDetails match = null;
-        for (DatanodeDetails dn : found) {
-          if (validateDNPortMatch(host.getPort(), dn)) {
-            match = dn;
-            break;
-          }
-        }
-        if (match == null) {
-          throw new InvalidHostStringException("Host " + host.getRawHostname()
-              + " is running multiple datanodes registered with SCM,"
-              + " but no port numbers match."
-              + " Please check the port number.");
-        }
-        results.add(match);
-      }
-    }
-    return results;
-  }
-
-  /**
-   * Check if the passed port is used by the given DatanodeDetails object. If
-   * it is, return true, otherwise return false.
-   * @param port Port number to check if it is used by the datanode
-   * @param dn Datanode to check if it is using the given port
-   * @return True if port is used by the datanode. False otherwise.
-   */
-  private boolean validateDNPortMatch(int port, DatanodeDetails dn) {
-    for (DatanodeDetails.Port p : dn.getPorts()) {
-      if (p.getValue() == port) {
-        return true;
-      }
-    }
-    return false;
-  }
-
   public NodeDecommissionManager(OzoneConfiguration config, NodeManager nm,
              ContainerManager containerManager, SCMContext scmContext,
              EventPublisher eventQueue, ReplicationManager rm) {
@@ -225,7 +111,8 @@ public class NodeDecommissionManager {
 
   public synchronized List<DatanodeAdminError> decommissionNodes(
       List<String> nodes) throws InvalidHostStringException {
-    List<DatanodeDetails> dns = mapHostnamesToDatanodes(nodes);
+    List<DatanodeDetails> dns = NodeUtils.mapHostnamesToDatanodes(nodeManager,
+        nodes, useHostnames);
     List<DatanodeAdminError> errors = new ArrayList<>();
     for (DatanodeDetails dn : dns) {
       try {
@@ -290,7 +177,8 @@ public class NodeDecommissionManager {
 
   public synchronized List<DatanodeAdminError> recommissionNodes(
       List<String> nodes) throws InvalidHostStringException {
-    List<DatanodeDetails> dns = mapHostnamesToDatanodes(nodes);
+    List<DatanodeDetails> dns = NodeUtils.mapHostnamesToDatanodes(nodeManager,
+        nodes, useHostnames);
     List<DatanodeAdminError> errors = new ArrayList<>();
     for (DatanodeDetails dn : dns) {
       try {
@@ -327,7 +215,8 @@ public class NodeDecommissionManager {
 
   public synchronized List<DatanodeAdminError> startMaintenanceNodes(
       List<String> nodes, int endInHours) throws InvalidHostStringException {
-    List<DatanodeDetails> dns = mapHostnamesToDatanodes(nodes);
+    List<DatanodeDetails> dns = NodeUtils.mapHostnamesToDatanodes(nodeManager,
+        nodes, useHostnames);
     List<DatanodeAdminError> errors = new ArrayList<>();
     for (DatanodeDetails dn : dns) {
       try {
diff --git 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/NodeUtils.java
 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/NodeUtils.java
new file mode 100644
index 0000000000..6df74d3746
--- /dev/null
+++ 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/NodeUtils.java
@@ -0,0 +1,150 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hdds.scm.node;
+
+import org.apache.hadoop.hdds.protocol.DatanodeDetails;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.net.InetAddress;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.net.UnknownHostException;
+import java.util.LinkedList;
+import java.util.List;
+
+/**
+ * Util class for Node operations.
+ */
+public final class NodeUtils {
+
+  private static final Logger LOG = LoggerFactory.getLogger(NodeUtils.class);
+
+  private NodeUtils() {
+  }
+
+  public static List<DatanodeDetails> mapHostnamesToDatanodes(
+      NodeManager nodeManager, List<String> hosts, boolean useHostnames)
+      throws InvalidHostStringException {
+    List<DatanodeDetails> results = new LinkedList<>();
+    for (String hostString : hosts) {
+      HostDefinition host = new HostDefinition(hostString);
+      InetAddress addr;
+      try {
+        addr = InetAddress.getByName(host.getHostname());
+      } catch (UnknownHostException e) {
+        throw new InvalidHostStringException("Unable to resolve host "
+            + host.getRawHostname(), e);
+      }
+      String dnsName;
+      if (useHostnames) {
+        dnsName = addr.getHostName();
+      } else {
+        dnsName = addr.getHostAddress();
+      }
+      List<DatanodeDetails> found = nodeManager.getNodesByAddress(dnsName);
+      if (found.size() == 0) {
+        throw new InvalidHostStringException("Host " + host.getRawHostname()
+            + " (" + dnsName + ") is not running any datanodes registered"
+            + " with SCM."
+            + " Please check the host name.");
+      } else if (found.size() == 1) {
+        if (host.getPort() != -1 &&
+            !validateDNPortMatch(host.getPort(), found.get(0))) {
+          throw new InvalidHostStringException("Host " + host.getRawHostname()
+              + " is running a datanode registered with SCM,"
+              + " but the port number doesn't match."
+              + " Please check the port number.");
+        }
+        results.add(found.get(0));
+      } else if (found.size() > 1) {
+        DatanodeDetails match = null;
+        for (DatanodeDetails dn : found) {
+          if (validateDNPortMatch(host.getPort(), dn)) {
+            match = dn;
+            break;
+          }
+        }
+        if (match == null) {
+          throw new InvalidHostStringException("Host " + host.getRawHostname()
+              + " is running multiple datanodes registered with SCM,"
+              + " but no port numbers match."
+              + " Please check the port number.");
+        }
+        results.add(match);
+      }
+    }
+    return results;
+  }
+
+  /**
+   * Check if the passed port is used by the given DatanodeDetails object. If
+   * it is, return true, otherwise return false.
+   * @param port Port number to check if it is used by the datanode
+   * @param dn Datanode to check if it is using the given port
+   * @return True if port is used by the datanode. False otherwise.
+   */
+  private static boolean validateDNPortMatch(int port, DatanodeDetails dn) {
+    for (DatanodeDetails.Port p : dn.getPorts()) {
+      if (p.getValue() == port) {
+        return true;
+      }
+    }
+    return false;
+  }
+  static class HostDefinition {
+    private String rawHostname;
+    private String hostname;
+    private int port;
+
+    HostDefinition(String hostname) throws InvalidHostStringException {
+      this.rawHostname = hostname;
+      parseHostname();
+    }
+
+    public String getRawHostname() {
+      return rawHostname;
+    }
+
+    public String getHostname() {
+      return hostname;
+    }
+
+    public int getPort() {
+      return port;
+    }
+
+    private void parseHostname() throws InvalidHostStringException {
+      try {
+        // A URI *must* have a scheme, so just create a fake one
+        URI uri = new URI("empty://" + rawHostname.trim());
+        this.hostname = uri.getHost();
+        this.port = uri.getPort();
+
+        if (this.hostname == null) {
+          throw new InvalidHostStringException("The string " + rawHostname +
+              " does not contain a value hostname or hostname:port 
definition");
+        }
+      } catch (URISyntaxException e) {
+        throw new InvalidHostStringException(
+            "Unable to parse the hoststring " + rawHostname, e);
+      }
+    }
+  }
+}
diff --git 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/protocol/StorageContainerLocationProtocolServerSideTranslatorPB.java
 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/protocol/StorageContainerLocationProtocolServerSideTranslatorPB.java
index 2dc3c5b04f..b97d87989f 100644
--- 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/protocol/StorageContainerLocationProtocolServerSideTranslatorPB.java
+++ 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/protocol/StorageContainerLocationProtocolServerSideTranslatorPB.java
@@ -39,6 +39,7 @@ import 
org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolPro
 import 
org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.ContainerResponseProto;
 import 
org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.DatanodeAdminErrorResponseProto;
 import 
org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.DatanodeUsageInfoResponseProto;
+import 
org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.DatanodeDiskBalancerInfoResponseProto;
 import 
org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.DeactivatePipelineRequestProto;
 import 
org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.DeactivatePipelineResponseProto;
 import 
org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.DecommissionNodesRequestProto;
@@ -683,6 +684,14 @@ public final class 
StorageContainerLocationProtocolServerSideTranslatorPB
                   transferScmLeadership(
                       request.getTransferScmLeadershipRequest()))
               .build();
+      case DatanodeDiskBalancerInfo:
+        return ScmContainerLocationResponse.newBuilder()
+          .setCmdType(request.getCmdType())
+          .setStatus(Status.OK)
+          .setDatanodeDiskBalancerInfoResponse(getDatanodeDiskBalancerInfo(
+              request.getDatanodeDiskBalancerInfoRequest(),
+              request.getVersion()))
+          .build();
       default:
         throw new IllegalArgumentException(
             "Unknown command type: " + request.getCmdType());
@@ -1210,4 +1219,26 @@ public final class 
StorageContainerLocationProtocolServerSideTranslatorPB
     impl.transferLeadership(newLeaderId);
     return TransferLeadershipResponseProto.getDefaultInstance();
   }
+
+  public DatanodeDiskBalancerInfoResponseProto getDatanodeDiskBalancerInfo(
+      StorageContainerLocationProtocolProtos.
+          DatanodeDiskBalancerInfoRequestProto request, int clientVersion)
+      throws IOException {
+    List<HddsProtos.DatanodeDiskBalancerInfoProto> infoProtoList;
+    switch (request.getInfoType()) {
+    case report:
+      infoProtoList = impl.getDiskBalancerReport(request.getCount(),
+          clientVersion);
+      break;
+    case status:
+      infoProtoList = impl.getDiskBalancerStatus(
+          Optional.of(request.getHostsList()), clientVersion);
+      break;
+    default:
+      infoProtoList = null;
+    }
+    return DatanodeDiskBalancerInfoResponseProto.newBuilder()
+        .addAllInfo(infoProtoList)
+        .build();
+  }
 }
diff --git 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMClientProtocolServer.java
 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMClientProtocolServer.java
index ccd4153698..7d85131d85 100644
--- 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMClientProtocolServer.java
+++ 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMClientProtocolServer.java
@@ -1212,6 +1212,56 @@ public class SCMClientProtocolServer implements
         ContainerID.valueOf(startContainerID), count, state);
   }
 
+  public List<HddsProtos.DatanodeDiskBalancerInfoProto> getDiskBalancerReport(
+      int count, int clientVersion) throws IOException {
+    // check admin authorisation
+    try {
+      getScm().checkAdminAccess(getRemoteUser());
+    } catch (IOException e) {
+      LOG.error("Authorization failed", e);
+      throw e;
+    }
+
+    return scm.getDiskBalancerManager().getDiskBalancerReport(count,
+        clientVersion);
+  }
+
+  @Override
+  public List<HddsProtos.DatanodeDiskBalancerInfoProto> getDiskBalancerStatus(
+      Optional<List<String>> hosts, int clientVersion) throws IOException {
+    // check admin authorisation
+    try {
+      getScm().checkAdminAccess(getRemoteUser());
+    } catch (IOException e) {
+      LOG.error("Authorization failed", e);
+      throw e;
+    }
+
+    return scm.getDiskBalancerManager().getDiskBalancerStatus(hosts,
+        clientVersion);
+  }
+
+  @Override
+  public void startDiskBalancer(Optional<Double> threshold,
+      Optional<Double> bandwidth, Optional<List<String>> hosts)
+      throws IOException {
+    // TODO: Send message to datanodes
+  }
+
+  @Override
+  public void stopDiskBalancer(Optional<List<String>> hosts)
+      throws IOException {
+    // TODO: Send message to datanodes
+  }
+
+
+  @Override
+  public void updateDiskBalancerConfiguration(Optional<Double> threshold,
+      Optional<Double> bandwidth, Optional<List<String>> hosts)
+      throws IOException {
+    // TODO: Send message to datanodes
+  }
+
   /**
    * Queries a list of Node that match a set of statuses.
    *
diff --git 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/StorageContainerManager.java
 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/StorageContainerManager.java
index 035958a39c..665a57a37f 100644
--- 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/StorageContainerManager.java
+++ 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/StorageContainerManager.java
@@ -64,6 +64,7 @@ import org.apache.hadoop.hdds.scm.ha.SCMRatisServerImpl;
 import org.apache.hadoop.hdds.scm.ha.SCMHAUtils;
 import org.apache.hadoop.hdds.scm.ha.SequenceIdGenerator;
 import org.apache.hadoop.hdds.scm.ScmInfo;
+import org.apache.hadoop.hdds.scm.node.DiskBalancerManager;
 import org.apache.hadoop.hdds.scm.node.NodeAddressUpdateHandler;
 import org.apache.hadoop.hdds.scm.server.upgrade.FinalizationManager;
 import org.apache.hadoop.hdds.scm.server.upgrade.FinalizationManagerImpl;
@@ -234,6 +235,7 @@ public final class StorageContainerManager extends 
ServiceRuntimeInfoImpl
   private WritableContainerFactory writableContainerFactory;
   private FinalizationManager finalizationManager;
   private HDDSLayoutVersionManager scmLayoutVersionManager;
+  private DiskBalancerManager diskBalancerManager;
 
   private SCMMetadataStore scmMetadataStore;
   private CertificateStore certificateStore;
@@ -769,6 +771,8 @@ public final class StorageContainerManager extends 
ServiceRuntimeInfoImpl
         .setSCMDBTransactionBuffer(scmHAManager.getDBTransactionBuffer())
         .setRatisServer(scmHAManager.getRatisServer())
         .build();
+    diskBalancerManager = new DiskBalancerManager(conf, eventQueue, scmContext,
+        scmNodeManager);
   }
 
   /**
@@ -1840,6 +1844,10 @@ public final class StorageContainerManager extends 
ServiceRuntimeInfoImpl
     return serviceManager;
   }
 
+  public DiskBalancerManager getDiskBalancerManager() {
+    return diskBalancerManager;
+  }
+
   /**
    * Force SCM out of safe mode.
    */
diff --git 
a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestDiskBalancerManager.java
 
b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestDiskBalancerManager.java
new file mode 100644
index 0000000000..541c9764b7
--- /dev/null
+++ 
b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestDiskBalancerManager.java
@@ -0,0 +1,99 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdds.scm.node;
+
+import org.apache.hadoop.hdds.HddsConfigKeys;
+import org.apache.hadoop.hdds.conf.OzoneConfiguration;
+import org.apache.hadoop.hdds.protocol.DatanodeDetails;
+import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
+import org.apache.hadoop.hdds.scm.container.MockNodeManager;
+import org.apache.hadoop.hdds.scm.ha.SCMContext;
+import org.apache.hadoop.hdds.server.events.EventQueue;
+import org.apache.hadoop.ozone.ClientVersion;
+import org.apache.ozone.test.GenericTestUtils;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Optional;
+import java.util.UUID;
+import java.util.stream.Collectors;
+
+/**
+ * Unit tests for the DiskBalancer manager.
+ */
+
+public class TestDiskBalancerManager {
+
+  private DiskBalancerManager diskBalancerManager;
+  private NodeManager nodeManager;
+  private OzoneConfiguration conf;
+  private String storageDir;
+
+  @BeforeEach
+  public void setup() throws Exception {
+    conf = new OzoneConfiguration();
+    storageDir = GenericTestUtils.getTempPath(
+        TestDiskBalancerManager.class.getSimpleName() + UUID.randomUUID());
+    conf.set(HddsConfigKeys.OZONE_METADATA_DIRS, storageDir);
+    nodeManager = new MockNodeManager(true, 3);
+    diskBalancerManager = new DiskBalancerManager(conf, new EventQueue(),
+        SCMContext.emptyContext(), nodeManager);
+  }
+
+  @Test
+  public void testDatanodeDiskBalancerReport() throws IOException {
+    List<HddsProtos.DatanodeDiskBalancerInfoProto> reportProtoList =
+        diskBalancerManager.getDiskBalancerReport(2,
+            ClientVersion.CURRENT_VERSION);
+
+    Assertions.assertEquals(2, reportProtoList.size());
+    Assertions.assertTrue(
+        reportProtoList.get(0).getCurrentVolumeDensitySum()
+            >= reportProtoList.get(1).getCurrentVolumeDensitySum());
+  }
+
+  @Test
+  public void testDatanodeDiskBalancerStatus() throws IOException {
+    diskBalancerManager.addRunningDatanode(nodeManager.getAllNodes().get(0));
+
+    // Simulate users asking all status of 3 datanodes
+    List<String> dns = nodeManager.getAllNodes().stream().map(
+        DatanodeDetails::getIpAddress).collect(
+        Collectors.toList());
+
+    List<HddsProtos.DatanodeDiskBalancerInfoProto> statusProtoList =
+        diskBalancerManager.getDiskBalancerStatus(Optional.of(dns),
+            ClientVersion.CURRENT_VERSION);
+
+    Assertions.assertEquals(3, statusProtoList.size());
+
+    // Simulate users asking status of 1 datanodes
+    dns = nodeManager.getAllNodes().stream().map(
+        DatanodeDetails::getIpAddress).limit(1).collect(
+        Collectors.toList());
+
+    statusProtoList =
+        diskBalancerManager.getDiskBalancerStatus(Optional.of(dns),
+            ClientVersion.CURRENT_VERSION);
+
+    Assertions.assertEquals(1, statusProtoList.size());
+  }
+}
diff --git 
a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestNodeDecommissionManager.java
 
b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestNodeDecommissionManager.java
index 7922aa13de..e60db61165 100644
--- 
a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestNodeDecommissionManager.java
+++ 
b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestNodeDecommissionManager.java
@@ -25,6 +25,7 @@ import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
 import org.apache.hadoop.hdds.scm.HddsTestUtils;
 import org.apache.hadoop.hdds.scm.DatanodeAdminError;
 import org.apache.hadoop.hdds.scm.ha.SCMContext;
+import org.apache.hadoop.hdds.scm.node.NodeUtils.HostDefinition;
 import org.apache.hadoop.hdds.scm.node.states.NodeNotFoundException;
 import org.apache.hadoop.hdds.scm.server.StorageContainerManager;
 import org.apache.hadoop.hdds.server.events.EventQueue;
@@ -67,26 +68,24 @@ public class TestNodeDecommissionManager {
   @Test
   public void testHostStringsParseCorrectly()
       throws InvalidHostStringException {
-    NodeDecommissionManager.HostDefinition def =
-        new NodeDecommissionManager.HostDefinition("foobar");
+    HostDefinition def = new HostDefinition("foobar");
     assertEquals("foobar", def.getHostname());
     assertEquals(-1, def.getPort());
 
-    def = new NodeDecommissionManager.HostDefinition(" foobar ");
+    def = new HostDefinition(" foobar ");
     assertEquals("foobar", def.getHostname());
     assertEquals(-1, def.getPort());
 
-    def = new NodeDecommissionManager.HostDefinition("foobar:1234");
+    def = new HostDefinition("foobar:1234");
     assertEquals("foobar", def.getHostname());
     assertEquals(1234, def.getPort());
 
-    def = new NodeDecommissionManager.HostDefinition(
-        "foobar.mycompany.com:1234");
+    def = new HostDefinition("foobar.mycompany.com:1234");
     assertEquals("foobar.mycompany.com", def.getHostname());
     assertEquals(1234, def.getPort());
 
     try {
-      new NodeDecommissionManager.HostDefinition("foobar:abcd");
+      new HostDefinition("foobar:abcd");
       fail("InvalidHostStringException should have been thrown");
     } catch (InvalidHostStringException e) {
     }
diff --git 
a/hadoop-hdds/tools/src/main/java/org/apache/hadoop/hdds/scm/cli/ContainerOperationClient.java
 
b/hadoop-hdds/tools/src/main/java/org/apache/hadoop/hdds/scm/cli/ContainerOperationClient.java
index 22050f1940..1c0535128d 100644
--- 
a/hadoop-hdds/tools/src/main/java/org/apache/hadoop/hdds/scm/cli/ContainerOperationClient.java
+++ 
b/hadoop-hdds/tools/src/main/java/org/apache/hadoop/hdds/scm/cli/ContainerOperationClient.java
@@ -509,4 +509,40 @@ public class ContainerOperationClient implements ScmClient 
{
     return storageContainerLocationClient.queryUpgradeFinalizationProgress(
         upgradeClientID, force, readonly);
   }
+
+  @Override
+  public List<HddsProtos.DatanodeDiskBalancerInfoProto> getDiskBalancerReport(
+      int count) throws IOException {
+    return storageContainerLocationClient.getDiskBalancerReport(count,
+        ClientVersion.CURRENT_VERSION);
+  }
+
+  @Override
+  public void startDiskBalancer(Optional<Double> threshold,
+      Optional<Double> bandwidth, Optional<List<String>> hosts)
+      throws IOException {
+    storageContainerLocationClient.startDiskBalancer(threshold, bandwidth,
+        hosts);
+  }
+
+  @Override
+  public void stopDiskBalancer(Optional<List<String>> hosts)
+      throws IOException {
+    storageContainerLocationClient.stopDiskBalancer(hosts);
+  }
+
+  @Override
+  public List<HddsProtos.DatanodeDiskBalancerInfoProto> getDiskBalancerStatus(
+      Optional<List<String>> hosts) throws IOException {
+    return storageContainerLocationClient.getDiskBalancerStatus(hosts,
+        ClientVersion.CURRENT_VERSION);
+  }
+
+  @Override
+  public void updateDiskBalancerConfiguration(Optional<Double> threshold,
+      Optional<Double> bandwidth, Optional<List<String>> hosts)
+      throws IOException {
+    storageContainerLocationClient.updateDiskBalancerConfiguration(threshold,
+        bandwidth, hosts);
+  }
 }
diff --git 
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/scm/node/TestDiskBalancer.java
 
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/scm/node/TestDiskBalancer.java
new file mode 100644
index 0000000000..267209cf03
--- /dev/null
+++ 
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/scm/node/TestDiskBalancer.java
@@ -0,0 +1,96 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.ozone.scm.node;
+
+import org.apache.hadoop.hdds.conf.OzoneConfiguration;
+import org.apache.hadoop.hdds.protocol.DatanodeDetails;
+import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
+import org.apache.hadoop.hdds.scm.HddsTestUtils;
+import org.apache.hadoop.hdds.scm.PlacementPolicy;
+import org.apache.hadoop.hdds.scm.ScmConfigKeys;
+import org.apache.hadoop.hdds.scm.cli.ContainerOperationClient;
+import org.apache.hadoop.hdds.scm.client.ScmClient;
+import 
org.apache.hadoop.hdds.scm.container.placement.algorithms.SCMContainerPlacementCapacity;
+import org.apache.hadoop.hdds.scm.node.DatanodeInfo;
+import org.apache.hadoop.ozone.MiniOzoneCluster;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.Timeout;
+
+import java.io.IOException;
+import java.util.List;
+
+import static org.junit.Assert.assertEquals;
+
+/**
+ * This class tests disk balancer operations.
+ */
+public class TestDiskBalancer {
+
+  /**
+    * Set a timeout for each test.
+    */
+  @Rule
+  public Timeout timeout = Timeout.seconds(300);
+
+  private static ScmClient storageClient;
+  private static MiniOzoneCluster cluster;
+  private static OzoneConfiguration ozoneConf;
+
+  @BeforeClass
+  public static void setup() throws Exception {
+    ozoneConf = new OzoneConfiguration();
+    ozoneConf.setClass(ScmConfigKeys.OZONE_SCM_CONTAINER_PLACEMENT_IMPL_KEY,
+        SCMContainerPlacementCapacity.class, PlacementPolicy.class);
+    cluster = 
MiniOzoneCluster.newBuilder(ozoneConf).setNumDatanodes(3).build();
+    storageClient = new ContainerOperationClient(ozoneConf);
+    cluster.waitForClusterToBeReady();
+
+    for (DatanodeDetails dn: cluster.getStorageContainerManager()
+        .getScmNodeManager().getAllNodes()) {
+      ((DatanodeInfo) dn).updateStorageReports(
+          HddsTestUtils.getRandomNodeReport(3, 1).getStorageReportList());
+    }
+  }
+
+  @AfterClass
+  public static void cleanup() throws Exception {
+    if (cluster != null) {
+      cluster.shutdown();
+    }
+  }
+
+  @Test
+  public void testDatanodeDiskBalancerReport() throws IOException {
+    List<HddsProtos.DatanodeDiskBalancerInfoProto> reportProtoList =
+        storageClient.getDiskBalancerReport(2);
+
+    assertEquals(2, reportProtoList.size());
+    Assert.assertTrue(
+        reportProtoList.get(0).getCurrentVolumeDensitySum()
+        >= reportProtoList.get(1).getCurrentVolumeDensitySum());
+  }
+
+  @Test
+  public void testDatanodeDiskBalancerStatus() throws IOException {
+    // TODO: Test status command with datanodes in balancing
+  }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to