This is an automated email from the ASF dual-hosted git repository.
sodonnell pushed a commit to branch HDDS-5713
in repository https://gitbox.apache.org/repos/asf/ozone.git
The following commit(s) were added to refs/heads/HDDS-5713 by this push:
new 15328df44e HDDS-10332. [Diskbalancer] Include Disk Balancer Report in
the heartbeat message (#6201)
15328df44e is described below
commit 15328df44e7a3eb50fa62a9893414eb0b721aa63
Author: Stephen O'Donnell <[email protected]>
AuthorDate: Sun Feb 11 21:59:44 2024 +0000
HDDS-10332. [Diskbalancer] Include Disk Balancer Report in the heartbeat
message (#6201)
---
.../states/endpoint/HeartbeatEndpointTask.java | 7 +++
.../container/diskbalancer/DiskBalancerInfo.java | 15 ++++++
.../states/endpoint/TestHeartbeatEndpointTask.java | 61 ++++++++++------------
3 files changed, 51 insertions(+), 32 deletions(-)
diff --git
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/states/endpoint/HeartbeatEndpointTask.java
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/states/endpoint/HeartbeatEndpointTask.java
index 0685650361..850f7674af 100644
---
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/states/endpoint/HeartbeatEndpointTask.java
+++
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/states/endpoint/HeartbeatEndpointTask.java
@@ -49,6 +49,7 @@ import org.apache.hadoop.ozone.container.common.statemachine
import org.apache.hadoop.ozone.container.common.statemachine
.EndpointStateMachine.EndPointStates;
import org.apache.hadoop.ozone.container.common.statemachine.StateContext;
+import org.apache.hadoop.ozone.container.diskbalancer.DiskBalancerInfo;
import org.apache.hadoop.ozone.protocol.commands.CloseContainerCommand;
import org.apache.hadoop.ozone.protocol.commands.ClosePipelineCommand;
import org.apache.hadoop.ozone.protocol.commands.CreatePipelineCommand;
@@ -178,6 +179,7 @@ public class HeartbeatEndpointTask
addContainerActions(requestBuilder);
addPipelineActions(requestBuilder);
addQueuedCommandCounts(requestBuilder);
+ addDiskBalancerReport(requestBuilder);
SCMHeartbeatRequestProto request = requestBuilder.build();
LOG.debug("Sending heartbeat message : {}", request);
SCMHeartbeatResponseProto response = rpcEndpoint.getEndPoint()
@@ -288,6 +290,11 @@ public class HeartbeatEndpointTask
requestBuilder.setCommandQueueReport(reportProto.build());
}
+ private void addDiskBalancerReport(SCMHeartbeatRequestProto.Builder
requestBuilder) {
+ DiskBalancerInfo info =
context.getParent().getContainer().getDiskBalancerInfo();
+ requestBuilder.setDiskBalancerReport(info.toDiskBalancerReportProto());
+ }
+
/**
* Returns a builder class for HeartbeatEndpointTask task.
* @return Builder.
diff --git
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/diskbalancer/DiskBalancerInfo.java
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/diskbalancer/DiskBalancerInfo.java
index 873c89aad1..086480fa6b 100644
---
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/diskbalancer/DiskBalancerInfo.java
+++
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/diskbalancer/DiskBalancerInfo.java
@@ -16,9 +16,12 @@
*/
package org.apache.hadoop.ozone.container.diskbalancer;
+import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
+import
org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos;
import org.apache.hadoop.hdds.scm.storage.DiskBalancerConfiguration;
import java.util.Objects;
+import java.util.Optional;
/**
* DiskBalancer's information to persist.
@@ -66,6 +69,18 @@ public class DiskBalancerInfo {
}
}
+ public StorageContainerDatanodeProtocolProtos.DiskBalancerReportProto
toDiskBalancerReportProto() {
+ DiskBalancerConfiguration conf = new
DiskBalancerConfiguration(Optional.of(threshold),
+ Optional.of(bandwidthInMB), Optional.of(parallelThread));
+ HddsProtos.DiskBalancerConfigurationProto confProto =
conf.toProtobufBuilder().build();
+
+ StorageContainerDatanodeProtocolProtos.DiskBalancerReportProto.Builder
builder =
+
StorageContainerDatanodeProtocolProtos.DiskBalancerReportProto.newBuilder();
+ builder.setIsRunning(shouldRun);
+ builder.setDiskBalancerConf(confProto);
+ return builder.build();
+ }
+
public boolean isShouldRun() {
return shouldRun;
}
diff --git
a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/states/endpoint/TestHeartbeatEndpointTask.java
b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/states/endpoint/TestHeartbeatEndpointTask.java
index 09fa8a9917..270780c205 100644
---
a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/states/endpoint/TestHeartbeatEndpointTask.java
+++
b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/states/endpoint/TestHeartbeatEndpointTask.java
@@ -38,7 +38,6 @@ import java.util.OptionalLong;
import java.util.UUID;
import org.apache.hadoop.hdds.client.ECReplicationConfig;
-import org.apache.hadoop.hdds.conf.ConfigurationSource;
import org.apache.hadoop.hdds.conf.OzoneConfiguration;
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
@@ -56,9 +55,12 @@ import
org.apache.hadoop.ozone.container.common.statemachine.DatanodeStateMachin
import
org.apache.hadoop.ozone.container.common.statemachine.DatanodeStateMachine.DatanodeStates;
import
org.apache.hadoop.ozone.container.common.statemachine.EndpointStateMachine;
import org.apache.hadoop.ozone.container.common.statemachine.StateContext;
+import org.apache.hadoop.ozone.container.diskbalancer.DiskBalancerInfo;
+import org.apache.hadoop.ozone.container.ozoneimpl.OzoneContainer;
import
org.apache.hadoop.ozone.protocol.commands.ReconstructECContainersCommand;
import
org.apache.hadoop.ozone.protocolPB.StorageContainerDatanodeProtocolClientSideTranslatorPB;
+import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.mockito.ArgumentCaptor;
@@ -70,6 +72,19 @@ public class TestHeartbeatEndpointTask {
private static final InetSocketAddress TEST_SCM_ENDPOINT =
new InetSocketAddress("test-scm-1", 9861);
+ private OzoneConfiguration conf;
+ private DatanodeStateMachine datanodeStateMachine;
+ private OzoneContainer container;
+
+ @BeforeEach
+ public void setup() {
+ conf = new OzoneConfiguration();
+ datanodeStateMachine = mock(DatanodeStateMachine.class);
+ container = mock(OzoneContainer.class);
+ when(container.getDiskBalancerInfo()).thenReturn(new
DiskBalancerInfo(true, 10, 20, 30));
+ when(datanodeStateMachine.getContainer()).thenReturn(container);
+ }
+
@Test
public void handlesReconstructContainerCommand() throws Exception {
StorageContainerDatanodeProtocolClientSideTranslatorPB scm =
@@ -94,14 +109,11 @@ public class TestHeartbeatEndpointTask {
.build())
.build());
- OzoneConfiguration conf = new OzoneConfiguration();
- DatanodeStateMachine datanodeStateMachine =
- mock(DatanodeStateMachine.class);
StateContext context = new StateContext(conf, DatanodeStates.RUNNING,
datanodeStateMachine, "");
// WHEN
- HeartbeatEndpointTask task = getHeartbeatEndpointTask(conf, context, scm);
+ HeartbeatEndpointTask task = getHeartbeatEndpointTask(context, scm);
task.call();
// THEN
@@ -126,12 +138,10 @@ public class TestHeartbeatEndpointTask {
.setTerm(termInSCM)
.build());
- OzoneConfiguration conf = new OzoneConfiguration();
StateContext context = new StateContext(conf, DatanodeStates.RUNNING,
- mock(DatanodeStateMachine.class), "");
+ datanodeStateMachine, "");
context.setTermOfLeaderSCM(1);
- HeartbeatEndpointTask endpointTask = getHeartbeatEndpointTask(
- conf, context, scm);
+ HeartbeatEndpointTask endpointTask = getHeartbeatEndpointTask(context,
scm);
endpointTask.call();
SCMHeartbeatRequestProto heartbeat = argument.getValue();
assertTrue(heartbeat.hasDatanodeDetails());
@@ -146,9 +156,8 @@ public class TestHeartbeatEndpointTask {
@Test
public void testheartbeatWithNodeReports() throws Exception {
- OzoneConfiguration conf = new OzoneConfiguration();
StateContext context = new StateContext(conf, DatanodeStates.RUNNING,
- mock(DatanodeStateMachine.class), "");
+ datanodeStateMachine, "");
StorageContainerDatanodeProtocolClientSideTranslatorPB scm =
mock(
@@ -163,8 +172,7 @@ public class TestHeartbeatEndpointTask {
.getDatanodeDetails().getUuid())
.build());
- HeartbeatEndpointTask endpointTask = getHeartbeatEndpointTask(
- conf, context, scm);
+ HeartbeatEndpointTask endpointTask = getHeartbeatEndpointTask(context,
scm);
context.addEndpoint(TEST_SCM_ENDPOINT);
context.refreshFullReport(NodeReportProto.getDefaultInstance());
endpointTask.call();
@@ -178,9 +186,8 @@ public class TestHeartbeatEndpointTask {
@Test
public void testheartbeatWithContainerReports() throws Exception {
- OzoneConfiguration conf = new OzoneConfiguration();
StateContext context = new StateContext(conf, DatanodeStates.RUNNING,
- mock(DatanodeStateMachine.class), "");
+ datanodeStateMachine, "");
StorageContainerDatanodeProtocolClientSideTranslatorPB scm =
mock(
@@ -195,8 +202,7 @@ public class TestHeartbeatEndpointTask {
.getDatanodeDetails().getUuid())
.build());
- HeartbeatEndpointTask endpointTask = getHeartbeatEndpointTask(
- conf, context, scm);
+ HeartbeatEndpointTask endpointTask = getHeartbeatEndpointTask(context,
scm);
context.addEndpoint(TEST_SCM_ENDPOINT);
context.refreshFullReport(ContainerReportsProto.getDefaultInstance());
endpointTask.call();
@@ -210,9 +216,8 @@ public class TestHeartbeatEndpointTask {
@Test
public void testheartbeatWithCommandStatusReports() throws Exception {
- OzoneConfiguration conf = new OzoneConfiguration();
StateContext context = new StateContext(conf, DatanodeStates.RUNNING,
- mock(DatanodeStateMachine.class), "");
+ datanodeStateMachine, "");
StorageContainerDatanodeProtocolClientSideTranslatorPB scm =
mock(
@@ -227,8 +232,7 @@ public class TestHeartbeatEndpointTask {
.getDatanodeDetails().getUuid())
.build());
- HeartbeatEndpointTask endpointTask = getHeartbeatEndpointTask(
- conf, context, scm);
+ HeartbeatEndpointTask endpointTask = getHeartbeatEndpointTask(context,
scm);
context.addEndpoint(TEST_SCM_ENDPOINT);
context.addIncrementalReport(
CommandStatusReportsProto.getDefaultInstance());
@@ -243,9 +247,8 @@ public class TestHeartbeatEndpointTask {
@Test
public void testheartbeatWithContainerActions() throws Exception {
- OzoneConfiguration conf = new OzoneConfiguration();
StateContext context = new StateContext(conf, DatanodeStates.RUNNING,
- mock(DatanodeStateMachine.class), "");
+ datanodeStateMachine, "");
StorageContainerDatanodeProtocolClientSideTranslatorPB scm =
mock(
@@ -260,8 +263,7 @@ public class TestHeartbeatEndpointTask {
.getDatanodeDetails().getUuid())
.build());
- HeartbeatEndpointTask endpointTask = getHeartbeatEndpointTask(
- conf, context, scm);
+ HeartbeatEndpointTask endpointTask = getHeartbeatEndpointTask(context,
scm);
context.addEndpoint(TEST_SCM_ENDPOINT);
context.addContainerAction(getContainerAction());
endpointTask.call();
@@ -275,9 +277,6 @@ public class TestHeartbeatEndpointTask {
@Test
public void testheartbeatWithAllReports() throws Exception {
- OzoneConfiguration conf = new OzoneConfiguration();
- DatanodeStateMachine datanodeStateMachine =
- mock(DatanodeStateMachine.class);
StateContext context = new StateContext(conf, DatanodeStates.RUNNING,
datanodeStateMachine, "");
@@ -303,8 +302,7 @@ public class TestHeartbeatEndpointTask {
.getDatanodeDetails().getUuid())
.build());
- HeartbeatEndpointTask endpointTask = getHeartbeatEndpointTask(
- conf, context, scm);
+ HeartbeatEndpointTask endpointTask = getHeartbeatEndpointTask(context,
scm);
context.addEndpoint(TEST_SCM_ENDPOINT);
context.refreshFullReport(NodeReportProto.getDefaultInstance());
context.refreshFullReport(ContainerReportsProto.getDefaultInstance());
@@ -326,20 +324,19 @@ public class TestHeartbeatEndpointTask {
assertEquals(commands.get(queueCount.getCommand(i)).intValue(),
queueCount.getCount(i));
}
+ assertTrue(heartbeat.hasDiskBalancerReport());
}
/**
* Creates HeartbeatEndpointTask with the given conf, context and
* StorageContainerManager client side proxy.
*
- * @param conf Configuration
* @param context StateContext
* @param proxy StorageContainerDatanodeProtocolClientSideTranslatorPB
*
* @return HeartbeatEndpointTask
*/
private HeartbeatEndpointTask getHeartbeatEndpointTask(
- ConfigurationSource conf,
StateContext context,
StorageContainerDatanodeProtocolClientSideTranslatorPB proxy) {
DatanodeDetails datanodeDetails = DatanodeDetails.newBuilder()
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]