This is an automated email from the ASF dual-hosted git repository.
sammichen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ozone.git
The following commit(s) were added to refs/heads/master by this push:
new dc02775 HDDS-4926. Support start/stop for container balancer via
command line (#2278)
dc02775 is described below
commit dc027759ad0d22d5d52cc1ee994a9a723fbb64e8
Author: Jackson Yao <[email protected]>
AuthorDate: Wed Jun 23 19:34:39 2021 +0800
HDDS-4926. Support start/stop for container balancer via command line
(#2278)
---
.../apache/hadoop/hdds/scm/client/ScmClient.java | 21 +++
.../protocol/StorageContainerLocationProtocol.java | 21 +++
.../org/apache/hadoop/ozone/audit/SCMAction.java | 3 +
...inerLocationProtocolClientSideTranslatorPB.java | 73 +++++++++++
.../src/main/proto/ScmAdminProtocol.proto | 36 ++++++
.../scm/container/balancer/ContainerBalancer.java | 101 +++++++++++----
.../balancer/ContainerBalancerConfiguration.java | 28 ++++
...inerLocationProtocolServerSideTranslatorPB.java | 70 ++++++++++
.../scm/server/OzoneStorageContainerManager.java | 3 +
.../hdds/scm/server/SCMClientProtocolServer.java | 63 +++++++++
.../hdds/scm/server/StorageContainerManager.java | 6 +
.../container/balancer/TestContainerBalancer.java | 20 ++-
.../hdds/scm/cli/ContainerBalancerCommands.java | 108 ++++++++++++++++
.../scm/cli/ContainerBalancerStartSubcommand.java | 66 ++++++++++
.../scm/cli/ContainerBalancerStatusSubcommand.java | 45 +++++++
.../scm/cli/ContainerBalancerStopSubcommand.java | 40 ++++++
.../hdds/scm/cli/ContainerOperationClient.java | 20 +++
.../datanode/TestContainerBalancerSubCommand.java | 141 +++++++++++++++++++++
.../ozone/TestContainerBalancerOperations.java | 112 ++++++++++++++++
.../scm/ReconStorageContainerManagerFacade.java | 6 +
20 files changed, 954 insertions(+), 29 deletions(-)
diff --git
a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/client/ScmClient.java
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/client/ScmClient.java
index e5c5680..f569672 100644
---
a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/client/ScmClient.java
+++
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/client/ScmClient.java
@@ -31,6 +31,7 @@ import java.io.Closeable;
import java.io.IOException;
import java.util.List;
import java.util.Map;
+import java.util.Optional;
/**
* The interface to call into underlying container layer.
@@ -306,6 +307,26 @@ public interface ScmClient extends Closeable {
boolean getReplicationManagerStatus() throws IOException;
/**
+ * Start ContainerBalancer.
+ */
+ boolean startContainerBalancer(Optional<Double> threshold,
+ Optional<Integer> idleiterations,
+ Optional<Integer> maxDatanodesToBalance,
+ Optional<Long> maxSizeToMoveInGB) throws IOException;
+
+ /**
+ * Stop ContainerBalancer.
+ */
+ void stopContainerBalancer() throws IOException;
+
+ /**
+ * Returns ContainerBalancer status.
+ *
+ * @return True if ContainerBalancer is running, false otherwise.
+ */
+ boolean getContainerBalancerStatus() throws IOException;
+
+ /**
* returns the list of ratis peer roles. Currently only include peer address.
*/
List<String> getScmRatisRoles() throws IOException;
diff --git
a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/protocol/StorageContainerLocationProtocol.java
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/protocol/StorageContainerLocationProtocol.java
index a2adc11..31c4472 100644
---
a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/protocol/StorageContainerLocationProtocol.java
+++
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/protocol/StorageContainerLocationProtocol.java
@@ -36,6 +36,7 @@ import java.util.EnumSet;
import java.util.Collections;
import java.util.List;
import java.util.Map;
+import java.util.Optional;
import java.util.Set;
/**
@@ -287,6 +288,26 @@ public interface StorageContainerLocationProtocol extends
Closeable {
boolean getReplicationManagerStatus() throws IOException;
/**
+ * Start ContainerBalancer.
+ */
+ boolean startContainerBalancer(Optional<Double> threshold,
+ Optional<Integer> idleiterations,
+ Optional<Integer> maxDatanodesToBalance,
+ Optional<Long> maxSizeToMoveInGB) throws IOException;
+
+ /**
+ * Stop ContainerBalancer.
+ */
+ void stopContainerBalancer() throws IOException;
+
+ /**
+ * Returns ContainerBalancer status.
+ *
+ * @return True if ContainerBalancer is running, false otherwise.
+ */
+ boolean getContainerBalancerStatus() throws IOException;
+
+ /**
* Get Datanode usage information by ip or uuid.
*
* @param ipaddress datanode IP address String
diff --git
a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/audit/SCMAction.java
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/audit/SCMAction.java
index a4ae55e..9b88c6a 100644
---
a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/audit/SCMAction.java
+++
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/audit/SCMAction.java
@@ -44,6 +44,9 @@ public enum SCMAction implements AuditAction {
START_REPLICATION_MANAGER,
STOP_REPLICATION_MANAGER,
GET_REPLICATION_MANAGER_STATUS,
+ START_CONTAINER_BALANCER,
+ STOP_CONTAINER_BALANCER,
+ GET_CONTAINER_BALANCER_STATUS,
GET_CONTAINER_WITH_PIPELINE_BATCH,
ADD_SCM;
diff --git
a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/scm/protocolPB/StorageContainerLocationProtocolClientSideTranslatorPB.java
b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/scm/protocolPB/StorageContainerLocationProtocolClientSideTranslatorPB.java
index 6aeb74d..b8105ca 100644
---
a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/scm/protocolPB/StorageContainerLocationProtocolClientSideTranslatorPB.java
+++
b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/scm/protocolPB/StorageContainerLocationProtocolClientSideTranslatorPB.java
@@ -57,6 +57,8 @@ import
org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolPro
import
org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.RecommissionNodesResponseProto;
import
org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.ReplicationManagerStatusRequestProto;
import
org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.ReplicationManagerStatusResponseProto;
+import
org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.ContainerBalancerStatusRequestProto;
+import
org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.ContainerBalancerStatusResponseProto;
import
org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.SCMCloseContainerRequestProto;
import
org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.SCMDeleteContainerRequestProto;
import
org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.SCMListContainerRequestProto;
@@ -69,6 +71,9 @@ import
org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolPro
import
org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.StartMaintenanceNodesResponseProto;
import
org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.StartReplicationManagerRequestProto;
import
org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.StopReplicationManagerRequestProto;
+import
org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.StartContainerBalancerRequestProto;
+import
org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.StartContainerBalancerResponseProto;
+import
org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.StopContainerBalancerRequestProto;
import
org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.Type;
import org.apache.hadoop.hdds.scm.DatanodeAdminError;
import org.apache.hadoop.hdds.scm.ScmInfo;
@@ -91,6 +96,7 @@ import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.Optional;
import java.util.function.Consumer;
import static org.apache.hadoop.ozone.ClientVersions.CURRENT_VERSION;
@@ -712,6 +718,73 @@ public final class
StorageContainerLocationProtocolClientSideTranslatorPB
}
+ @Override
+ public boolean startContainerBalancer(Optional<Double> threshold,
+ Optional<Integer> idleiterations,
+ Optional<Integer> maxDatanodesToBalance,
+ Optional<Long> maxSizeToMoveInGB) throws IOException{
+ StartContainerBalancerRequestProto.Builder builder =
+ StartContainerBalancerRequestProto.newBuilder();
+ builder.setTraceID(TracingUtil.exportCurrentSpan());
+
+ //make balancer configuration optional
+ if (threshold.isPresent()) {
+ double tsd = threshold.get();
+ Preconditions.checkState(tsd >= 0.0D && tsd < 1.0D,
+ "threshold should to be specified in range [0.0, 1.0).");
+ builder.setThreshold(tsd);
+ }
+ if (maxSizeToMoveInGB.isPresent()) {
+ long mstm = maxSizeToMoveInGB.get();
+ Preconditions.checkState(mstm > 0,
+ "maxSizeToMoveInGB must be positive.");
+ builder.setMaxSizeToMoveInGB(mstm);
+ }
+ if (maxDatanodesToBalance.isPresent()) {
+ int mdtb = maxDatanodesToBalance.get();
+ Preconditions.checkState(mdtb > 0,
+ "maxDatanodesToBalance must be positive.");
+ builder.setMaxDatanodesToBalance(mdtb);
+ }
+ if (idleiterations.isPresent()) {
+ int idi = idleiterations.get();
+ Preconditions.checkState(idi > 0 || idi == -1,
+ "idleiterations must be positive or" +
+ " -1(infinitly run container balancer).");
+ builder.setIdleiterations(idi);
+ }
+
+ StartContainerBalancerRequestProto request = builder.build();
+ StartContainerBalancerResponseProto response =
+ submitRequest(Type.StartContainerBalancer,
+ builder1 -> builder1.setStartContainerBalancerRequest(request))
+ .getStartContainerBalancerResponse();
+ return response.getStart();
+ }
+
+ @Override
+ public void stopContainerBalancer() throws IOException {
+
+ StopContainerBalancerRequestProto request =
+ StopContainerBalancerRequestProto.getDefaultInstance();
+ submitRequest(Type.StopContainerBalancer,
+ builder -> builder.setStopContainerBalancerRequest(request));
+
+ }
+
+ @Override
+ public boolean getContainerBalancerStatus() throws IOException {
+
+ ContainerBalancerStatusRequestProto request =
+ ContainerBalancerStatusRequestProto.getDefaultInstance();
+ ContainerBalancerStatusResponseProto response =
+ submitRequest(Type.GetContainerBalancerStatus,
+ builder -> builder.setContainerBalancerStatusRequest(request))
+ .getContainerBalancerStatusResponse();
+ return response.getIsRunning();
+
+ }
+
/**
* Builds request for datanode usage information and receives response.
*
diff --git a/hadoop-hdds/interface-admin/src/main/proto/ScmAdminProtocol.proto
b/hadoop-hdds/interface-admin/src/main/proto/ScmAdminProtocol.proto
index 5389a55..d67dce9 100644
--- a/hadoop-hdds/interface-admin/src/main/proto/ScmAdminProtocol.proto
+++ b/hadoop-hdds/interface-admin/src/main/proto/ScmAdminProtocol.proto
@@ -69,6 +69,9 @@ message ScmContainerLocationRequest {
optional DatanodeUsageInfoRequestProto DatanodeUsageInfoRequest = 30;
optional GetExistContainerWithPipelinesInBatchRequestProto
getExistContainerWithPipelinesInBatchRequest = 31;
optional GetContainerTokenRequestProto containerTokenRequest = 32;
+ optional StartContainerBalancerRequestProto startContainerBalancerRequest =
33;
+ optional StopContainerBalancerRequestProto stopContainerBalancerRequest = 34;
+ optional ContainerBalancerStatusRequestProto containerBalancerStatusRequest
= 35;
}
message ScmContainerLocationResponse {
@@ -109,6 +112,9 @@ message ScmContainerLocationResponse {
optional DatanodeUsageInfoResponseProto DatanodeUsageInfoResponse = 30;
optional GetExistContainerWithPipelinesInBatchResponseProto
getExistContainerWithPipelinesInBatchResponse = 31;
optional GetContainerTokenResponseProto containerTokenResponse = 32;
+ optional StartContainerBalancerResponseProto startContainerBalancerResponse
= 33;
+ optional StopContainerBalancerResponseProto stopContainerBalancerResponse =
34;
+ optional ContainerBalancerStatusResponseProto
containerBalancerStatusResponse = 35;
enum Status {
OK = 1;
@@ -147,6 +153,9 @@ enum Type {
DatanodeUsageInfo = 25;
GetExistContainerWithPipelinesInBatch = 26;
GetContainerToken = 27;
+ StartContainerBalancer = 28;
+ StopContainerBalancer = 29;
+ GetContainerBalancerStatus = 30;
}
/**
@@ -445,6 +454,33 @@ message GetContainerTokenResponseProto {
required TokenProto token = 1;
}
+message StartContainerBalancerRequestProto {
+ optional string traceID = 1;
+ optional double threshold = 2;
+ optional int32 idleiterations = 3;
+ optional int32 maxDatanodesToBalance = 4;
+ optional int64 maxSizeToMoveInGB = 5;
+}
+
+message StartContainerBalancerResponseProto {
+ required bool start = 1;
+}
+
+message StopContainerBalancerRequestProto {
+ optional string traceID = 1;
+}
+
+message StopContainerBalancerResponseProto {
+}
+
+message ContainerBalancerStatusRequestProto {
+ optional string traceID = 1;
+}
+
+message ContainerBalancerStatusResponseProto {
+ required bool isRunning = 1;
+}
+
/**
* Protocol used from an HDFS node to StorageContainerManager. See the request
* and response messages for details of the RPC calls.
diff --git
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/balancer/ContainerBalancer.java
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/balancer/ContainerBalancer.java
index 4b8501ce..9371bfb 100644
---
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/balancer/ContainerBalancer.java
+++
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/balancer/ContainerBalancer.java
@@ -34,6 +34,8 @@ import java.util.Collections;
import java.util.Comparator;
import java.util.List;
import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
/**
* Container balancer is a service in SCM to move containers between over- and
@@ -51,7 +53,8 @@ public class ContainerBalancer {
private final SCMContext scmContext;
private double threshold;
private int maxDatanodesToBalance;
- private long maxSizeToMove;
+ private long maxSizeToMoveInGB;
+ private int idleIteration;
private List<DatanodeUsageInfo> unBalancedNodes;
private List<DatanodeUsageInfo> overUtilizedNodes;
private List<DatanodeUsageInfo> underUtilizedNodes;
@@ -63,6 +66,8 @@ public class ContainerBalancer {
private long clusterRemaining;
private double clusterAvgUtilisation;
private final AtomicBoolean balancerRunning = new AtomicBoolean(false);
+ private Thread currentBalancingThread;
+ private Lock lock;
/**
* Constructs ContainerBalancer with the specified arguments. Initializes
@@ -96,28 +101,43 @@ public class ContainerBalancer {
this.underUtilizedNodes = new ArrayList<>();
this.unBalancedNodes = new ArrayList<>();
this.withinThresholdUtilizedNodes = new ArrayList<>();
+ this.lock = new ReentrantLock();
}
-
/**
* Starts ContainerBalancer. Current implementation is incomplete.
*
* @param balancerConfiguration Configuration values.
*/
- public boolean start(ContainerBalancerConfiguration balancerConfiguration) {
- if (!balancerRunning.compareAndSet(false, true)) {
- LOG.error("Container Balancer is already running.");
- return false;
+ public boolean start(
+ ContainerBalancerConfiguration balancerConfiguration) {
+ lock.lock();
+ try {
+ if (!balancerRunning.compareAndSet(false, true)) {
+ LOG.info("Container Balancer is already running.");
+ return false;
+ }
+
+ this.config = balancerConfiguration;
+ this.idleIteration = config.getIdleIteration();
+ this.threshold = config.getThreshold();
+ this.maxDatanodesToBalance = config.getMaxDatanodesToBalance();
+ this.maxSizeToMoveInGB = config.getMaxSizeToMove();
+ this.unBalancedNodes = new ArrayList<>();
+ LOG.info("Starting Container Balancer...{}", this);
+ //we should start a new balancer thread async
+ //and response to cli as soon as possible
+
+
+ //TODO: this is a temporary implementation
+ //modify this later
+ currentBalancingThread = new Thread(() -> balance());
+ currentBalancingThread.start();
+ ////////////////////////
+ } finally {
+ lock.unlock();
}
- ozoneConfiguration = new OzoneConfiguration();
- this.config = balancerConfiguration;
- this.threshold = config.getThreshold();
- this.maxDatanodesToBalance = config.getMaxDatanodesToBalance();
- this.maxSizeToMove = config.getMaxSizeToMove();
- this.unBalancedNodes = new ArrayList<>();
- LOG.info("Starting Container Balancer...{}", this);
- balance();
return true;
}
@@ -125,13 +145,18 @@ public class ContainerBalancer {
* Balances the cluster.
*/
private void balance() {
- initializeIteration();
-
- // unBalancedNodes is not cleared since the next iteration uses this
- // iteration's unBalancedNodes to find out how many nodes were balanced
- overUtilizedNodes.clear();
- underUtilizedNodes.clear();
- withinThresholdUtilizedNodes.clear();
+ for (int i = 0; i < idleIteration; i++) {
+ if (!initializeIteration()) {
+ //balancer should be stopped immediately
+ break;
+ }
+ // unBalancedNodes is not cleared since the next iteration uses this
+ // iteration's unBalancedNodes to find out how many nodes were balanced
+ overUtilizedNodes.clear();
+ underUtilizedNodes.clear();
+ withinThresholdUtilizedNodes.clear();
+ }
+ balancerRunning.compareAndSet(true, false);
}
/**
@@ -152,7 +177,6 @@ public class ContainerBalancer {
if (datanodeUsageInfos.isEmpty()) {
LOG.info("Container Balancer could not retrieve nodes from Node " +
"Manager.");
- stop();
return false;
}
@@ -221,15 +245,21 @@ public class ContainerBalancer {
maxDatanodesToBalance) {
LOG.info("Approaching Max Datanodes To Balance limit in Container " +
"Balancer. Stopping Balancer.");
- stop();
return false;
} else {
unBalancedNodes.addAll(overUtilizedNodes);
unBalancedNodes.addAll(underUtilizedNodes);
+ //for now, we just sleep to simulate the execution of balancer
+ //this if for acceptance test now. modify this later when balancer
+ //if fully completed
+ try {
+ Thread.sleep(50);
+ } catch (InterruptedException e) {}
+ /////////////////////////////
+
if (unBalancedNodes.isEmpty()) {
LOG.info("Did not find any unbalanced Datanodes.");
- stop();
return false;
} else {
LOG.info("Container Balancer has identified Datanodes that need to be"
+
@@ -323,8 +353,27 @@ public class ContainerBalancer {
* Stops ContainerBalancer.
*/
public void stop() {
- balancerRunning.set(false);
- LOG.info("Container Balancer stopped.");
+ lock.lock();
+ try {
+ //we should stop the balancer thread gracefully
+ if(!balancerRunning.get()) {
+ LOG.info("Container Balancer is not running.");
+ return;
+ }
+
+
+ //TODO: this is a temporary implementation
+ //modify this later
+ if (currentBalancingThread.isAlive()) {
+ currentBalancingThread.stop();
+ }
+ ///////////////////////////
+
+ balancerRunning.compareAndSet(true, false);
+ } finally {
+ lock.unlock();
+ }
+ LOG.info("Container Balancer stopped successfully.");
}
public void setNodeManager(NodeManager nodeManager) {
diff --git
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/balancer/ContainerBalancerConfiguration.java
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/balancer/ContainerBalancerConfiguration.java
index 0549683..d9ae868 100644
---
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/balancer/ContainerBalancerConfiguration.java
+++
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/balancer/ContainerBalancerConfiguration.java
@@ -56,6 +56,11 @@ public final class ContainerBalancerConfiguration {
"by Container Balancer.")
private long maxSizeToMove = 10 * OzoneConsts.GB;
+ @Config(key = "idle.iterations", type = ConfigType.INT,
+ defaultValue = "10", tags = {ConfigTag.BALANCER},
+ description = "The idle iteration count of Container Balancer")
+ private int idleIterations = 10;
+
/**
* Gets the threshold value for Container Balancer.
*
@@ -79,6 +84,29 @@ public final class ContainerBalancerConfiguration {
}
/**
+ * Gets the idle iteration value for Container Balancer.
+ *
+ * @return a idle iteration count larger than 0
+ */
+ public int getIdleIteration() {
+ return idleIterations;
+ }
+
+ /**
+ * Sets the idle iteration value for Container Balancer.
+ *
+ * @param count a idle iteration count larger than 0
+ */
+ public void setIdleIteration(int count) {
+ if (count < -1 || 0 == count) {
+ throw new IllegalArgumentException(
+ "Idle iteration count must be larger than 0 or " +
+ "-1(for infinitely running).");
+ }
+ this.idleIterations = count;
+ }
+
+ /**
* Gets the value of maximum number of datanodes that will be balanced by
* Container Balancer.
*
diff --git
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/protocol/StorageContainerLocationProtocolServerSideTranslatorPB.java
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/protocol/StorageContainerLocationProtocolServerSideTranslatorPB.java
index 07c1095..2aaa3a4 100644
---
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/protocol/StorageContainerLocationProtocolServerSideTranslatorPB.java
+++
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/protocol/StorageContainerLocationProtocolServerSideTranslatorPB.java
@@ -62,6 +62,8 @@ import
org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolPro
import
org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.RecommissionNodesResponseProto;
import
org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.ReplicationManagerStatusRequestProto;
import
org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.ReplicationManagerStatusResponseProto;
+import
org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.ContainerBalancerStatusRequestProto;
+import
org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.ContainerBalancerStatusResponseProto;
import
org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.SCMCloseContainerRequestProto;
import
org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.SCMCloseContainerResponseProto;
import
org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.SCMDeleteContainerRequestProto;
@@ -78,6 +80,10 @@ import
org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolPro
import
org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.StartReplicationManagerResponseProto;
import
org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.StopReplicationManagerRequestProto;
import
org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.StopReplicationManagerResponseProto;
+import
org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.StartContainerBalancerRequestProto;
+import
org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.StartContainerBalancerResponseProto;
+import
org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.StopContainerBalancerRequestProto;
+import
org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.StopContainerBalancerResponseProto;
import org.apache.hadoop.hdds.scm.DatanodeAdminError;
import org.apache.hadoop.hdds.scm.ScmInfo;
import org.apache.hadoop.hdds.scm.container.ContainerID;
@@ -97,6 +103,7 @@ import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
+import java.util.Optional;
import static
org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.PipelineResponseProto.Error.errorPipelineAlreadyExists;
import static
org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.PipelineResponseProto.Error.success;
@@ -306,6 +313,27 @@ public final class
StorageContainerLocationProtocolServerSideTranslatorPB
.setReplicationManagerStatusResponse(getReplicationManagerStatus(
request.getSeplicationManagerStatusRequest()))
.build();
+ case StartContainerBalancer:
+ return ScmContainerLocationResponse.newBuilder()
+ .setCmdType(request.getCmdType())
+ .setStatus(Status.OK)
+ .setStartContainerBalancerResponse(startContainerBalancer(
+ request.getStartContainerBalancerRequest()))
+ .build();
+ case StopContainerBalancer:
+ return ScmContainerLocationResponse.newBuilder()
+ .setCmdType(request.getCmdType())
+ .setStatus(Status.OK)
+ .setStopContainerBalancerResponse(stopContainerBalancer(
+ request.getStopContainerBalancerRequest()))
+ .build();
+ case GetContainerBalancerStatus:
+ return ScmContainerLocationResponse.newBuilder()
+ .setCmdType(request.getCmdType())
+ .setStatus(Status.OK)
+ .setContainerBalancerStatusResponse(getContainerBalancerStatus(
+ request.getContainerBalancerStatusRequest()))
+ .build();
case GetPipeline:
return ScmContainerLocationResponse.newBuilder()
.setCmdType(request.getCmdType())
@@ -611,6 +639,48 @@ public final class
StorageContainerLocationProtocolServerSideTranslatorPB
.setIsRunning(impl.getReplicationManagerStatus()).build();
}
+ public StartContainerBalancerResponseProto startContainerBalancer(
+ StartContainerBalancerRequestProto request)
+ throws IOException {
+ Optional<Double> threshold = Optional.empty();
+ Optional<Integer> idleiterations = Optional.empty();
+ Optional<Integer> maxDatanodesToBalance = Optional.empty();
+ Optional<Long> maxSizeToMoveInGB = Optional.empty();
+
+ if(request.hasThreshold()) {
+ threshold = Optional.of(request.getThreshold());
+ }
+ if(request.hasIdleiterations()) {
+ idleiterations = Optional.of(request.getIdleiterations());
+ }
+ if(request.hasMaxDatanodesToBalance()) {
+ maxDatanodesToBalance = Optional.of(request.getMaxDatanodesToBalance());
+ }
+ if(request.hasMaxSizeToMoveInGB()) {
+ maxSizeToMoveInGB = Optional.of(request.getMaxSizeToMoveInGB());
+ }
+
+ return StartContainerBalancerResponseProto.newBuilder().
+ setStart(impl.startContainerBalancer(threshold,
+ idleiterations, maxDatanodesToBalance,
+ maxSizeToMoveInGB)).build();
+ }
+
+ public StopContainerBalancerResponseProto stopContainerBalancer(
+ StopContainerBalancerRequestProto request)
+ throws IOException {
+ impl.stopContainerBalancer();
+ return StopContainerBalancerResponseProto.newBuilder().build();
+
+ }
+
+ public ContainerBalancerStatusResponseProto getContainerBalancerStatus(
+ ContainerBalancerStatusRequestProto request)
+ throws IOException {
+ return ContainerBalancerStatusResponseProto.newBuilder()
+ .setIsRunning(impl.getContainerBalancerStatus()).build();
+ }
+
public DecommissionNodesResponseProto decommissionNodes(
DecommissionNodesRequestProto request) throws IOException {
List<DatanodeAdminError> errors =
diff --git
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/OzoneStorageContainerManager.java
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/OzoneStorageContainerManager.java
index b0e1ae1..7948ffd 100644
---
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/OzoneStorageContainerManager.java
+++
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/OzoneStorageContainerManager.java
@@ -24,6 +24,7 @@ import java.net.InetSocketAddress;
import org.apache.hadoop.hdds.scm.block.BlockManager;
import org.apache.hadoop.hdds.scm.container.ContainerManagerV2;
import org.apache.hadoop.hdds.scm.container.ReplicationManager;
+import org.apache.hadoop.hdds.scm.container.balancer.ContainerBalancer;
import org.apache.hadoop.hdds.scm.ha.SCMNodeDetails;
import org.apache.hadoop.hdds.scm.node.NodeManager;
import org.apache.hadoop.hdds.scm.pipeline.PipelineManager;
@@ -50,6 +51,8 @@ public interface OzoneStorageContainerManager {
ReplicationManager getReplicationManager();
+ ContainerBalancer getContainerBalancer();
+
InetSocketAddress getDatanodeRpcAddress();
SCMNodeDetails getScmNodeDetails();
diff --git
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMClientProtocolServer.java
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMClientProtocolServer.java
index e8e6cce..a374766 100644
---
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMClientProtocolServer.java
+++
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMClientProtocolServer.java
@@ -22,6 +22,7 @@
package org.apache.hadoop.hdds.scm.server;
import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
import com.google.common.collect.Maps;
import com.google.protobuf.BlockingService;
import com.google.protobuf.ProtocolMessageEnum;
@@ -35,6 +36,7 @@ import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
import
org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos;
import org.apache.hadoop.hdds.scm.DatanodeAdminError;
import org.apache.hadoop.hdds.scm.ScmInfo;
+import
org.apache.hadoop.hdds.scm.container.balancer.ContainerBalancerConfiguration;
import org.apache.hadoop.hdds.scm.node.NodeStatus;
import org.apache.hadoop.hdds.scm.events.SCMEvents;
import org.apache.hadoop.hdds.scm.node.states.NodeNotFoundException;
@@ -60,6 +62,7 @@ import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.ipc.ProtobufRpcEngine;
import org.apache.hadoop.ipc.RPC;
import org.apache.hadoop.ipc.Server;
+import org.apache.hadoop.ozone.OzoneConsts;
import org.apache.hadoop.ozone.audit.AuditAction;
import org.apache.hadoop.ozone.audit.AuditEventStatus;
import org.apache.hadoop.ozone.audit.AuditLogger;
@@ -80,6 +83,7 @@ import java.util.List;
import java.util.Map;
import java.util.Collections;
import java.util.ArrayList;
+import java.util.Optional;
import java.util.TreeSet;
import java.util.Set;
import java.util.stream.Collectors;
@@ -670,6 +674,65 @@ public class SCMClientProtocolServer implements
return scm.getReplicationManager().isRunning();
}
+ @Override
+ public boolean startContainerBalancer(Optional<Double> threshold,
+ Optional<Integer> idleiterations,
+ Optional<Integer> maxDatanodesToBalance,
+ Optional<Long> maxSizeToMoveInGB) throws IOException{
+ getScm().checkAdminAccess(getRemoteUser());
+ ContainerBalancerConfiguration cbc = new ContainerBalancerConfiguration();
+ if (threshold.isPresent()) {
+ double tsd = threshold.get();
+ Preconditions.checkState(tsd >= 0.0D && tsd < 1.0D,
+ "threshold should to be specified in range [0.0, 1.0).");
+ cbc.setThreshold(tsd);
+ }
+ if (maxSizeToMoveInGB.isPresent()) {
+ long mstm = maxSizeToMoveInGB.get();
+ Preconditions.checkState(mstm > 0,
+ "maxSizeToMoveInGB must be positive.");
+ cbc.setMaxSizeToMove(mstm * OzoneConsts.GB);
+ }
+ if (maxDatanodesToBalance.isPresent()) {
+ int mdtb = maxDatanodesToBalance.get();
+ Preconditions.checkState(mdtb > 0,
+ "maxDatanodesToBalance must be positive.");
+ cbc.setMaxDatanodesToBalance(mdtb);
+ }
+ if (idleiterations.isPresent()) {
+ int idi = idleiterations.get();
+ Preconditions.checkState(idi > 0 || idi == -1,
+ "idleiterations must be positive or" +
+ " -1(infinitly run container balancer).");
+ cbc.setIdleIteration(idi);
+ }
+
+ boolean isStartedSuccessfully = scm.getContainerBalancer().start(cbc);
+ if (isStartedSuccessfully) {
+ AUDIT.logWriteSuccess(buildAuditMessageForSuccess(
+ SCMAction.START_CONTAINER_BALANCER, null));
+ } else {
+ AUDIT.logWriteFailure(buildAuditMessageForSuccess(
+ SCMAction.START_CONTAINER_BALANCER, null));
+ }
+ return isStartedSuccessfully;
+ }
+
+ @Override
+ public void stopContainerBalancer() throws IOException {
+ getScm().checkAdminAccess(getRemoteUser());
+ AUDIT.logWriteSuccess(buildAuditMessageForSuccess(
+ SCMAction.STOP_CONTAINER_BALANCER, null));
+ scm.getContainerBalancer().stop();
+ }
+
+ @Override
+ public boolean getContainerBalancerStatus() {
+ AUDIT.logWriteSuccess(buildAuditMessageForSuccess(
+ SCMAction.GET_CONTAINER_BALANCER_STATUS, null));
+ return scm.getContainerBalancer().isBalancerRunning();
+ }
+
/**
* Get Datanode usage info such as capacity, SCMUsed, and remaining by ip
* or uuid.
diff --git
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/StorageContainerManager.java
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/StorageContainerManager.java
index 1afdb95..8cd3357 100644
---
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/StorageContainerManager.java
+++
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/StorageContainerManager.java
@@ -1512,6 +1512,12 @@ public final class StorageContainerManager extends
ServiceRuntimeInfoImpl
return replicationManager;
}
+ @VisibleForTesting
+ @Override
+ public ContainerBalancer getContainerBalancer() {
+ return containerBalancer;
+ }
+
/**
* Check if the current scm is the leader and ready for accepting requests.
* @return - if the current scm is the leader and is ready.
diff --git
a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/balancer/TestContainerBalancer.java
b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/balancer/TestContainerBalancer.java
index 4c76a26..d09f579 100644
---
a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/balancer/TestContainerBalancer.java
+++
b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/balancer/TestContainerBalancer.java
@@ -69,6 +69,7 @@ public class TestContainerBalancer {
balancerConfiguration = new ContainerBalancerConfiguration();
balancerConfiguration.setThreshold(0.1);
+ balancerConfiguration.setIdleIteration(1);
balancerConfiguration.setMaxDatanodesToBalance(10);
balancerConfiguration.setMaxSizeToMove(500 * OzoneConsts.GB);
conf.setFromObject(balancerConfiguration);
@@ -96,11 +97,18 @@ public class TestContainerBalancer {
balancerConfiguration.setThreshold(randomThreshold);
containerBalancer.start(balancerConfiguration);
+
+ // waiting for balance completed.
+ // TODO: this is a temporary implementation for now
+ // modify this after balancer is fully completed
+ try {
+ Thread.sleep(100);
+ } catch (InterruptedException e) {}
+
expectedUnBalancedNodes =
determineExpectedUnBalancedNodes(randomThreshold);
unBalancedNodesAccordingToBalancer =
containerBalancer.getUnBalancedNodes();
-
Assert.assertEquals(
expectedUnBalancedNodes.size(),
unBalancedNodesAccordingToBalancer.size());
@@ -134,11 +142,17 @@ public class TestContainerBalancer {
@Test
public void containerBalancerShouldStopWhenMaxDatanodesToBalanceIsReached() {
balancerConfiguration.setMaxDatanodesToBalance(2);
- balancerConfiguration.setThreshold(0);
+ balancerConfiguration.setThreshold(0.1);
containerBalancer.start(balancerConfiguration);
+ // waiting for balance completed.
+ // TODO: this is a temporary implementation for now
+ // modify this after balancer is fully completed
+ try {
+ Thread.sleep(3000);
+ } catch (InterruptedException e) {}
+
Assert.assertFalse(containerBalancer.isBalancerRunning());
- containerBalancer.stop();
}
/**
diff --git
a/hadoop-hdds/tools/src/main/java/org/apache/hadoop/hdds/scm/cli/ContainerBalancerCommands.java
b/hadoop-hdds/tools/src/main/java/org/apache/hadoop/hdds/scm/cli/ContainerBalancerCommands.java
new file mode 100644
index 0000000..bc25444
--- /dev/null
+++
b/hadoop-hdds/tools/src/main/java/org/apache/hadoop/hdds/scm/cli/ContainerBalancerCommands.java
@@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdds.scm.cli;
+
+import java.util.concurrent.Callable;
+
+import org.apache.hadoop.hdds.cli.GenericCli;
+import org.apache.hadoop.hdds.cli.HddsVersionProvider;
+import org.apache.hadoop.hdds.cli.OzoneAdmin;
+import org.apache.hadoop.hdds.cli.SubcommandWithParent;
+
+import org.kohsuke.MetaInfServices;
+import picocli.CommandLine.Command;
+import picocli.CommandLine.Model.CommandSpec;
+import picocli.CommandLine.Spec;
+
+/**
+ * Subcommand to group container balancer related operations.
+ *
+ * <p>The balancer is a tool that balances datanode space usage on an Ozone
+ * cluster when some datanodes become full or when new empty nodes join
+ * the cluster. The tool can be run by the cluster administrator
+ * from command line while applications adding and deleting blocks.
+ *
+ * <p>SYNOPSIS
+ * <pre>
+ * To start:
+ * ozone admin containerbalancer start
+ * [ -t/--threshold {@literal <threshold>}]
+ * [ -i/--idleiterations {@literal <idleiterations>}]
+ * [ -d/--maxDatanodesToBalance {@literal <maxDatanodesToBalance>}]
+ * [ -s/--maxSizeToMoveInGB {@literal <maxSizeToMoveInGB>}]
+ * Examples:
+ * ozone admin containerbalancer start
+ * start balancer with default values in the configuration
+ * ozone admin containerbalancer start -t 0.05
+ * start balancer with a threshold of 5%
+ * ozone admin containerbalancer start -i 20
+ * start balancer with maximum 20 consecutive idle iterations
+ * ozone admin containerbalancer start -i 0
+ * run balancer infinitely with default values in the configuration
+ * ozone admin containerbalancer start -d 10
+ * start balancer with maximum 10 datanodes to balance
+ * ozone admin containerbalancer start -s 10
+ * start balancer with maximum size of 10GB to move
+ * To stop:
+ * ozone admin containerbalancer stop
+ * </pre>
+ *
+ * <p>DESCRIPTION
+ * <p>The threshold parameter is a fraction in the range of (1%, 100%) with a
+ * default value of 10%. The threshold sets a target for whether the cluster
+ * is balanced. A cluster is balanced if for each datanode, the utilization
+ * of the node (ratio of used space at the node to total capacity of the node)
+ * differs from the utilization of the (ratio of used space in the cluster
+ * to total capacity of the cluster) by no more than the threshold value.
+ * The smaller the threshold, the more balanced a cluster will become.
+ * It takes more time to run the balancer for small threshold values.
+ * Also for a very small threshold the cluster may not be able to reach the
+ * balanced state when applications write and delete files concurrently.
+ *
+ * <p>The administrator can interrupt the execution of the balancer at any
+ * time by running the command "ozone admin containerbalancer stop"
+ * through command line
+ */
+@Command(
+ name = "containerbalancer",
+ description = "ContainerBalancer specific operations",
+ mixinStandardHelpOptions = true,
+ versionProvider = HddsVersionProvider.class,
+ subcommands = {
+ ContainerBalancerStartSubcommand.class,
+ ContainerBalancerStopSubcommand.class,
+ ContainerBalancerStatusSubcommand.class
+ })
+@MetaInfServices(SubcommandWithParent.class)
+public class ContainerBalancerCommands implements Callable<Void>,
+ SubcommandWithParent {
+
+ @Spec
+ private CommandSpec spec;
+
+ @Override
+ public Void call() throws Exception {
+ GenericCli.missingSubcommand(spec);
+ return null;
+ }
+
+ @Override
+ public Class<?> getParentType() {
+ return OzoneAdmin.class;
+ }
+}
diff --git
a/hadoop-hdds/tools/src/main/java/org/apache/hadoop/hdds/scm/cli/ContainerBalancerStartSubcommand.java
b/hadoop-hdds/tools/src/main/java/org/apache/hadoop/hdds/scm/cli/ContainerBalancerStartSubcommand.java
new file mode 100644
index 0000000..94d9ef7
--- /dev/null
+++
b/hadoop-hdds/tools/src/main/java/org/apache/hadoop/hdds/scm/cli/ContainerBalancerStartSubcommand.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdds.scm.cli;
+
+import org.apache.hadoop.hdds.cli.HddsVersionProvider;
+import org.apache.hadoop.hdds.scm.client.ScmClient;
+import picocli.CommandLine.Command;
+import picocli.CommandLine.Option;
+
+import java.io.IOException;
+import java.util.Optional;
+
+/**
+ * Handler to start container balancer.
+ */
+@Command(
+ name = "start",
+ description = "Start ContainerBalancer",
+ mixinStandardHelpOptions = true,
+ versionProvider = HddsVersionProvider.class)
+public class ContainerBalancerStartSubcommand extends ScmSubcommand {
+
+ @Option(names = {"-t", "--threshold"},
+ description = "Threshold target whether the cluster is balanced")
+ private Optional<Double> threshold;
+
+ @Option(names = {"-i", "--idleiterations"},
+ description = "Maximum consecutive idle iterations")
+ private Optional<Integer> idleiterations;
+
+ @Option(names = {"-d", "--maxDatanodesToBalance"},
+ description = "Maximum datanodes to move")
+ private Optional<Integer> maxDatanodesToBalance;
+
+ @Option(names = {"-s", "--maxSizeToMoveInGB"},
+ description = "Maximum size to move in GB, " +
+ "for 10GB it should be set as 10")
+ private Optional<Long> maxSizeToMoveInGB;
+
+ @Override
+ public void execute(ScmClient scmClient) throws IOException {
+ boolean result = scmClient.startContainerBalancer(threshold,
idleiterations,
+ maxDatanodesToBalance, maxSizeToMoveInGB);
+ if (result) {
+ System.out.println("Starting ContainerBalancer Successfully.");
+ return;
+ }
+ System.out.println("ContainerBalancer is already running, " +
+ "Please stop it first.");
+ }
+}
\ No newline at end of file
diff --git
a/hadoop-hdds/tools/src/main/java/org/apache/hadoop/hdds/scm/cli/ContainerBalancerStatusSubcommand.java
b/hadoop-hdds/tools/src/main/java/org/apache/hadoop/hdds/scm/cli/ContainerBalancerStatusSubcommand.java
new file mode 100644
index 0000000..e0cd436bd
--- /dev/null
+++
b/hadoop-hdds/tools/src/main/java/org/apache/hadoop/hdds/scm/cli/ContainerBalancerStatusSubcommand.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdds.scm.cli;
+
+import org.apache.hadoop.hdds.cli.HddsVersionProvider;
+import org.apache.hadoop.hdds.scm.client.ScmClient;
+import picocli.CommandLine.Command;
+
+import java.io.IOException;
+
+/**
+ * Handler to query status of container balancer.
+ */
+@Command(
+ name = "status",
+ description = "Check if ContainerBalancer is running or not",
+ mixinStandardHelpOptions = true,
+ versionProvider = HddsVersionProvider.class)
+public class ContainerBalancerStatusSubcommand extends ScmSubcommand {
+
+ @Override
+ public void execute(ScmClient scmClient) throws IOException {
+ boolean execReturn = scmClient.getContainerBalancerStatus();
+ if(execReturn){
+ System.out.println("ContainerBalancer is Running.");
+ } else {
+ System.out.println("ContainerBalancer is Not Running.");
+ }
+ }
+}
diff --git
a/hadoop-hdds/tools/src/main/java/org/apache/hadoop/hdds/scm/cli/ContainerBalancerStopSubcommand.java
b/hadoop-hdds/tools/src/main/java/org/apache/hadoop/hdds/scm/cli/ContainerBalancerStopSubcommand.java
new file mode 100644
index 0000000..89e7680
--- /dev/null
+++
b/hadoop-hdds/tools/src/main/java/org/apache/hadoop/hdds/scm/cli/ContainerBalancerStopSubcommand.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdds.scm.cli;
+
+import org.apache.hadoop.hdds.cli.HddsVersionProvider;
+import org.apache.hadoop.hdds.scm.client.ScmClient;
+import picocli.CommandLine.Command;
+
+import java.io.IOException;
+
+/**
+ * Handler to stop container balancer.
+ */
+@Command(
+ name = "stop",
+ description = "Stop ContainerBalancer",
+ mixinStandardHelpOptions = true,
+ versionProvider = HddsVersionProvider.class)
+public class ContainerBalancerStopSubcommand extends ScmSubcommand {
+ @Override
+ public void execute(ScmClient scmClient) throws IOException {
+ scmClient.stopContainerBalancer();
+ System.out.println("Stopping ContainerBalancer...");
+ }
+}
diff --git
a/hadoop-hdds/tools/src/main/java/org/apache/hadoop/hdds/scm/cli/ContainerOperationClient.java
b/hadoop-hdds/tools/src/main/java/org/apache/hadoop/hdds/scm/cli/ContainerOperationClient.java
index e48a719..aeed4ef 100644
---
a/hadoop-hdds/tools/src/main/java/org/apache/hadoop/hdds/scm/cli/ContainerOperationClient.java
+++
b/hadoop-hdds/tools/src/main/java/org/apache/hadoop/hdds/scm/cli/ContainerOperationClient.java
@@ -21,6 +21,7 @@ import java.io.IOException;
import java.security.cert.X509Certificate;
import java.util.List;
import java.util.Map;
+import java.util.Optional;
import com.google.common.base.Preconditions;
import org.apache.commons.lang3.tuple.Pair;
@@ -551,6 +552,25 @@ public class ContainerOperationClient implements ScmClient
{
}
@Override
+ public boolean startContainerBalancer(Optional<Double>threshold,
+ Optional<Integer> idleiterations,
+ Optional<Integer> maxDatanodesToBalance,
+ Optional<Long> maxSizeToMoveInGB) throws IOException {
+ return storageContainerLocationClient.startContainerBalancer(threshold,
+ idleiterations, maxDatanodesToBalance, maxSizeToMoveInGB);
+ }
+
+ @Override
+ public void stopContainerBalancer() throws IOException {
+ storageContainerLocationClient.stopContainerBalancer();
+ }
+
+ @Override
+ public boolean getContainerBalancerStatus() throws IOException {
+ return storageContainerLocationClient.getContainerBalancerStatus();
+ }
+
+ @Override
public List<String> getScmRatisRoles() throws IOException {
return storageContainerLocationClient.getScmInfo().getRatisPeerRoles();
}
diff --git
a/hadoop-hdds/tools/src/test/java/org/apache/hadoop/hdds/scm/cli/datanode/TestContainerBalancerSubCommand.java
b/hadoop-hdds/tools/src/test/java/org/apache/hadoop/hdds/scm/cli/datanode/TestContainerBalancerSubCommand.java
new file mode 100644
index 0000000..b377895
--- /dev/null
+++
b/hadoop-hdds/tools/src/test/java/org/apache/hadoop/hdds/scm/cli/datanode/TestContainerBalancerSubCommand.java
@@ -0,0 +1,141 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdds.scm.cli.datanode;
+
+import org.apache.hadoop.hdds.scm.cli.ContainerBalancerStopSubcommand;
+import org.apache.hadoop.hdds.scm.cli.ContainerBalancerStartSubcommand;
+import org.apache.hadoop.hdds.scm.cli.ContainerBalancerStatusSubcommand;
+import org.apache.hadoop.hdds.scm.client.ScmClient;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.Mockito;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.PrintStream;
+import java.io.UnsupportedEncodingException;
+import java.nio.charset.StandardCharsets;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Mockito.mock;
+
+/**
+ * Unit tests to validate the the ContainerBalancerSubCommand class includes
the
+ * correct output when executed against a mock client.
+ */
+public class TestContainerBalancerSubCommand {
+
+ private ContainerBalancerStopSubcommand stopCmd;
+ private ContainerBalancerStartSubcommand startCmd;
+ private ContainerBalancerStatusSubcommand statusCmd;
+ private final ByteArrayOutputStream outContent = new ByteArrayOutputStream();
+ private final ByteArrayOutputStream errContent = new ByteArrayOutputStream();
+ private final PrintStream originalOut = System.out;
+ private final PrintStream originalErr = System.err;
+ private static final String DEFAULT_ENCODING = StandardCharsets.UTF_8.name();
+
+ @Before
+ public void setup() throws UnsupportedEncodingException {
+ stopCmd = new ContainerBalancerStopSubcommand();
+ startCmd = new ContainerBalancerStartSubcommand();
+ statusCmd = new ContainerBalancerStatusSubcommand();
+ System.setOut(new PrintStream(outContent, false, DEFAULT_ENCODING));
+ System.setErr(new PrintStream(errContent, false, DEFAULT_ENCODING));
+ }
+
+ @After
+ public void tearDown() {
+ System.setOut(originalOut);
+ System.setErr(originalErr);
+ }
+
+ @Test
+ public void testContainerBalancerStatusSubcommandRunning()
+ throws IOException {
+ ScmClient scmClient = mock(ScmClient.class);
+
+ //test status is running
+ Mockito.when(scmClient.getContainerBalancerStatus())
+ .thenAnswer(invocation -> true);
+
+ statusCmd.execute(scmClient);
+
+ Pattern p = Pattern.compile(
+ "^ContainerBalancer\\sis\\sRunning.");
+ Matcher m = p.matcher(outContent.toString(DEFAULT_ENCODING));
+ assertTrue(m.find());
+ }
+
+ @Test
+ public void testContainerBalancerStatusSubcommandNotRunning()
+ throws IOException {
+ ScmClient scmClient = mock(ScmClient.class);
+
+ Mockito.when(scmClient.getContainerBalancerStatus())
+ .thenAnswer(invocation -> false);
+
+ statusCmd.execute(scmClient);
+
+ Pattern p = Pattern.compile(
+ "^ContainerBalancer\\sis\\sNot\\sRunning.");
+ Matcher m = p.matcher(outContent.toString(DEFAULT_ENCODING));
+ assertTrue(m.find());
+ }
+
+ @Test
+ public void testContainerBalancerStopSubcommand() throws IOException {
+ ScmClient scmClient = mock(ScmClient.class);
+ stopCmd.execute(scmClient);
+
+ Pattern p = Pattern.compile("^Stopping\\sContainerBalancer...");
+ Matcher m = p.matcher(outContent.toString(DEFAULT_ENCODING));
+ assertTrue(m.find());
+ }
+
+ @Test
+ public void testContainerBalancerStartSubcommandWhenBalancerIsNotRunning()
+ throws IOException {
+ ScmClient scmClient = mock(ScmClient.class);
+ Mockito.when(scmClient.startContainerBalancer(null, null, null, null))
+ .thenAnswer(invocation -> true);
+ startCmd.execute(scmClient);
+
+ Pattern p = Pattern.compile("^Starting\\sContainerBalancer" +
+ "\\sSuccessfully.");
+ Matcher m = p.matcher(outContent.toString(DEFAULT_ENCODING));
+ assertTrue(m.find());
+ }
+
+ @Test
+ public void testContainerBalancerStartSubcommandWhenBalancerIsRunning()
+ throws IOException {
+ ScmClient scmClient = mock(ScmClient.class);
+ Mockito.when(scmClient.startContainerBalancer(null, null, null, null))
+ .thenAnswer(invocation -> false);
+ startCmd.execute(scmClient);
+
+ Pattern p = Pattern.compile("^ContainerBalancer\\sis\\salready\\srunning,"
+
+ "\\sPlease\\sstop\\sit\\sfirst.");
+ Matcher m = p.matcher(outContent.toString(DEFAULT_ENCODING));
+ assertTrue(m.find());
+ }
+
+}
diff --git
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/TestContainerBalancerOperations.java
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/TestContainerBalancerOperations.java
new file mode 100644
index 0000000..7e52520
--- /dev/null
+++
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/TestContainerBalancerOperations.java
@@ -0,0 +1,112 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.ozone;
+
+import org.apache.hadoop.hdds.conf.OzoneConfiguration;
+import org.apache.hadoop.hdds.scm.PlacementPolicy;
+import org.apache.hadoop.hdds.scm.ScmConfigKeys;
+import org.apache.hadoop.hdds.scm.cli.ContainerOperationClient;
+import org.apache.hadoop.hdds.scm.client.ScmClient;
+import
org.apache.hadoop.hdds.scm.container.placement.algorithms.SCMContainerPlacementCapacity;
+
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.Timeout;
+
+import java.util.Optional;
+
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+/**
+ * This class tests container balancer operations
+ * from cblock clients.
+ */
+public class TestContainerBalancerOperations {
+
+ /**
+ * Set a timeout for each test.
+ */
+ @Rule
+ public Timeout timeout = Timeout.seconds(300);
+
+ private static ScmClient containerBalancerClient;
+ private static MiniOzoneCluster cluster;
+ private static OzoneConfiguration ozoneConf;
+
+ @BeforeClass
+ public static void setup() throws Exception {
+ ozoneConf = new OzoneConfiguration();
+ ozoneConf.setClass(ScmConfigKeys.OZONE_SCM_CONTAINER_PLACEMENT_IMPL_KEY,
+ SCMContainerPlacementCapacity.class, PlacementPolicy.class);
+ cluster =
MiniOzoneCluster.newBuilder(ozoneConf).setNumDatanodes(3).build();
+ containerBalancerClient = new ContainerOperationClient(ozoneConf);
+ cluster.waitForClusterToBeReady();
+ }
+
+ @AfterClass
+ public static void cleanup() throws Exception {
+ if(cluster != null) {
+ cluster.shutdown();
+ }
+ }
+
+ /**
+ * test container balancer operation with {@link ContainerOperationClient}.
+ * @throws Exception
+ */
+ @Test
+ public void testContainerBalancerCLIOperations() throws Exception {
+ // test normally start and stop
+ boolean running = containerBalancerClient.getContainerBalancerStatus();
+ assertFalse(running);
+ Optional<Double> threshold = Optional.of(0.1);
+ Optional<Integer> idleiterations = Optional.of(10000);
+ Optional<Integer> maxDatanodesToBalance = Optional.of(1);
+ Optional<Long> maxSizeToMoveInGB = Optional.of(1L);
+
+ containerBalancerClient.startContainerBalancer(threshold, idleiterations,
+ maxDatanodesToBalance, maxSizeToMoveInGB);
+ running = containerBalancerClient.getContainerBalancerStatus();
+ assertTrue(running);
+
+ // waiting for balance completed.
+ // TODO: this is a temporary implementation for now
+ // modify this after balancer is fully completed
+ try {
+ Thread.sleep(100);
+ } catch (InterruptedException e) {}
+
+ running = containerBalancerClient.getContainerBalancerStatus();
+ assertFalse(running);
+
+ // test normally start , and stop it before balance is completed
+ containerBalancerClient.startContainerBalancer(threshold, idleiterations,
+ maxDatanodesToBalance, maxSizeToMoveInGB);
+ running = containerBalancerClient.getContainerBalancerStatus();
+ assertTrue(running);
+
+ containerBalancerClient.stopContainerBalancer();
+ running = containerBalancerClient.getContainerBalancerStatus();
+ assertFalse(running);
+ }
+
+ //TODO: add more acceptance after container balancer is fully completed
+}
\ No newline at end of file
diff --git
a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/scm/ReconStorageContainerManagerFacade.java
b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/scm/ReconStorageContainerManagerFacade.java
index ebef4d8..652ab6e 100644
---
a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/scm/ReconStorageContainerManagerFacade.java
+++
b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/scm/ReconStorageContainerManagerFacade.java
@@ -34,6 +34,7 @@ import
org.apache.hadoop.hdds.scm.container.ContainerManagerV2;
import org.apache.hadoop.hdds.scm.container.ContainerReportHandler;
import org.apache.hadoop.hdds.scm.container.IncrementalContainerReportHandler;
import org.apache.hadoop.hdds.scm.container.ReplicationManager;
+import org.apache.hadoop.hdds.scm.container.balancer.ContainerBalancer;
import
org.apache.hadoop.hdds.scm.container.placement.algorithms.ContainerPlacementPolicyFactory;
import
org.apache.hadoop.hdds.scm.container.placement.algorithms.SCMContainerPlacementMetrics;
import org.apache.hadoop.hdds.scm.events.SCMEvents;
@@ -323,6 +324,11 @@ public class ReconStorageContainerManagerFacade
}
@Override
+ public ContainerBalancer getContainerBalancer() {
+ return null;
+ }
+
+ @Override
public InetSocketAddress getDatanodeRpcAddress() {
return getDatanodeProtocolServer().getDatanodeRpcAddress();
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]