This is an automated email from the ASF dual-hosted git repository. yiyang0203 pushed a commit to branch HDDS-5713 in repository https://gitbox.apache.org/repos/asf/ozone.git
commit 07d2d7840ad9f7b7158236c10100eab78f2c2deb Author: Stephen O'Donnell <[email protected]> AuthorDate: Sun Feb 11 21:59:44 2024 +0000 HDDS-10332. [Diskbalancer] Include Disk Balancer Report in the heartbeat message (#6201) --- .../states/endpoint/HeartbeatEndpointTask.java | 7 +++ .../container/diskbalancer/DiskBalancerInfo.java | 15 ++++++ .../states/endpoint/TestHeartbeatEndpointTask.java | 61 ++++++++++------------ 3 files changed, 51 insertions(+), 32 deletions(-) diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/states/endpoint/HeartbeatEndpointTask.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/states/endpoint/HeartbeatEndpointTask.java index 0685650361..850f7674af 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/states/endpoint/HeartbeatEndpointTask.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/states/endpoint/HeartbeatEndpointTask.java @@ -49,6 +49,7 @@ import org.apache.hadoop.ozone.container.common.statemachine import org.apache.hadoop.ozone.container.common.statemachine .EndpointStateMachine.EndPointStates; import org.apache.hadoop.ozone.container.common.statemachine.StateContext; +import org.apache.hadoop.ozone.container.diskbalancer.DiskBalancerInfo; import org.apache.hadoop.ozone.protocol.commands.CloseContainerCommand; import org.apache.hadoop.ozone.protocol.commands.ClosePipelineCommand; import org.apache.hadoop.ozone.protocol.commands.CreatePipelineCommand; @@ -178,6 +179,7 @@ public class HeartbeatEndpointTask addContainerActions(requestBuilder); addPipelineActions(requestBuilder); addQueuedCommandCounts(requestBuilder); + addDiskBalancerReport(requestBuilder); SCMHeartbeatRequestProto request = requestBuilder.build(); LOG.debug("Sending heartbeat message : {}", request); SCMHeartbeatResponseProto response = rpcEndpoint.getEndPoint() @@ -288,6 +290,11 @@ public class HeartbeatEndpointTask requestBuilder.setCommandQueueReport(reportProto.build()); } + private void addDiskBalancerReport(SCMHeartbeatRequestProto.Builder requestBuilder) { + DiskBalancerInfo info = context.getParent().getContainer().getDiskBalancerInfo(); + requestBuilder.setDiskBalancerReport(info.toDiskBalancerReportProto()); + } + /** * Returns a builder class for HeartbeatEndpointTask task. * @return Builder. diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/diskbalancer/DiskBalancerInfo.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/diskbalancer/DiskBalancerInfo.java index 873c89aad1..086480fa6b 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/diskbalancer/DiskBalancerInfo.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/diskbalancer/DiskBalancerInfo.java @@ -16,9 +16,12 @@ */ package org.apache.hadoop.ozone.container.diskbalancer; +import org.apache.hadoop.hdds.protocol.proto.HddsProtos; +import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos; import org.apache.hadoop.hdds.scm.storage.DiskBalancerConfiguration; import java.util.Objects; +import java.util.Optional; /** * DiskBalancer's information to persist. @@ -66,6 +69,18 @@ public class DiskBalancerInfo { } } + public StorageContainerDatanodeProtocolProtos.DiskBalancerReportProto toDiskBalancerReportProto() { + DiskBalancerConfiguration conf = new DiskBalancerConfiguration(Optional.of(threshold), + Optional.of(bandwidthInMB), Optional.of(parallelThread)); + HddsProtos.DiskBalancerConfigurationProto confProto = conf.toProtobufBuilder().build(); + + StorageContainerDatanodeProtocolProtos.DiskBalancerReportProto.Builder builder = + StorageContainerDatanodeProtocolProtos.DiskBalancerReportProto.newBuilder(); + builder.setIsRunning(shouldRun); + builder.setDiskBalancerConf(confProto); + return builder.build(); + } + public boolean isShouldRun() { return shouldRun; } diff --git a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/states/endpoint/TestHeartbeatEndpointTask.java b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/states/endpoint/TestHeartbeatEndpointTask.java index 09fa8a9917..270780c205 100644 --- a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/states/endpoint/TestHeartbeatEndpointTask.java +++ b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/states/endpoint/TestHeartbeatEndpointTask.java @@ -38,7 +38,6 @@ import java.util.OptionalLong; import java.util.UUID; import org.apache.hadoop.hdds.client.ECReplicationConfig; -import org.apache.hadoop.hdds.conf.ConfigurationSource; import org.apache.hadoop.hdds.conf.OzoneConfiguration; import org.apache.hadoop.hdds.protocol.DatanodeDetails; @@ -56,9 +55,12 @@ import org.apache.hadoop.ozone.container.common.statemachine.DatanodeStateMachin import org.apache.hadoop.ozone.container.common.statemachine.DatanodeStateMachine.DatanodeStates; import org.apache.hadoop.ozone.container.common.statemachine.EndpointStateMachine; import org.apache.hadoop.ozone.container.common.statemachine.StateContext; +import org.apache.hadoop.ozone.container.diskbalancer.DiskBalancerInfo; +import org.apache.hadoop.ozone.container.ozoneimpl.OzoneContainer; import org.apache.hadoop.ozone.protocol.commands.ReconstructECContainersCommand; import org.apache.hadoop.ozone.protocolPB.StorageContainerDatanodeProtocolClientSideTranslatorPB; +import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.mockito.ArgumentCaptor; @@ -70,6 +72,19 @@ public class TestHeartbeatEndpointTask { private static final InetSocketAddress TEST_SCM_ENDPOINT = new InetSocketAddress("test-scm-1", 9861); + private OzoneConfiguration conf; + private DatanodeStateMachine datanodeStateMachine; + private OzoneContainer container; + + @BeforeEach + public void setup() { + conf = new OzoneConfiguration(); + datanodeStateMachine = mock(DatanodeStateMachine.class); + container = mock(OzoneContainer.class); + when(container.getDiskBalancerInfo()).thenReturn(new DiskBalancerInfo(true, 10, 20, 30)); + when(datanodeStateMachine.getContainer()).thenReturn(container); + } + @Test public void handlesReconstructContainerCommand() throws Exception { StorageContainerDatanodeProtocolClientSideTranslatorPB scm = @@ -94,14 +109,11 @@ public class TestHeartbeatEndpointTask { .build()) .build()); - OzoneConfiguration conf = new OzoneConfiguration(); - DatanodeStateMachine datanodeStateMachine = - mock(DatanodeStateMachine.class); StateContext context = new StateContext(conf, DatanodeStates.RUNNING, datanodeStateMachine, ""); // WHEN - HeartbeatEndpointTask task = getHeartbeatEndpointTask(conf, context, scm); + HeartbeatEndpointTask task = getHeartbeatEndpointTask(context, scm); task.call(); // THEN @@ -126,12 +138,10 @@ public class TestHeartbeatEndpointTask { .setTerm(termInSCM) .build()); - OzoneConfiguration conf = new OzoneConfiguration(); StateContext context = new StateContext(conf, DatanodeStates.RUNNING, - mock(DatanodeStateMachine.class), ""); + datanodeStateMachine, ""); context.setTermOfLeaderSCM(1); - HeartbeatEndpointTask endpointTask = getHeartbeatEndpointTask( - conf, context, scm); + HeartbeatEndpointTask endpointTask = getHeartbeatEndpointTask(context, scm); endpointTask.call(); SCMHeartbeatRequestProto heartbeat = argument.getValue(); assertTrue(heartbeat.hasDatanodeDetails()); @@ -146,9 +156,8 @@ public class TestHeartbeatEndpointTask { @Test public void testheartbeatWithNodeReports() throws Exception { - OzoneConfiguration conf = new OzoneConfiguration(); StateContext context = new StateContext(conf, DatanodeStates.RUNNING, - mock(DatanodeStateMachine.class), ""); + datanodeStateMachine, ""); StorageContainerDatanodeProtocolClientSideTranslatorPB scm = mock( @@ -163,8 +172,7 @@ public class TestHeartbeatEndpointTask { .getDatanodeDetails().getUuid()) .build()); - HeartbeatEndpointTask endpointTask = getHeartbeatEndpointTask( - conf, context, scm); + HeartbeatEndpointTask endpointTask = getHeartbeatEndpointTask(context, scm); context.addEndpoint(TEST_SCM_ENDPOINT); context.refreshFullReport(NodeReportProto.getDefaultInstance()); endpointTask.call(); @@ -178,9 +186,8 @@ public class TestHeartbeatEndpointTask { @Test public void testheartbeatWithContainerReports() throws Exception { - OzoneConfiguration conf = new OzoneConfiguration(); StateContext context = new StateContext(conf, DatanodeStates.RUNNING, - mock(DatanodeStateMachine.class), ""); + datanodeStateMachine, ""); StorageContainerDatanodeProtocolClientSideTranslatorPB scm = mock( @@ -195,8 +202,7 @@ public class TestHeartbeatEndpointTask { .getDatanodeDetails().getUuid()) .build()); - HeartbeatEndpointTask endpointTask = getHeartbeatEndpointTask( - conf, context, scm); + HeartbeatEndpointTask endpointTask = getHeartbeatEndpointTask(context, scm); context.addEndpoint(TEST_SCM_ENDPOINT); context.refreshFullReport(ContainerReportsProto.getDefaultInstance()); endpointTask.call(); @@ -210,9 +216,8 @@ public class TestHeartbeatEndpointTask { @Test public void testheartbeatWithCommandStatusReports() throws Exception { - OzoneConfiguration conf = new OzoneConfiguration(); StateContext context = new StateContext(conf, DatanodeStates.RUNNING, - mock(DatanodeStateMachine.class), ""); + datanodeStateMachine, ""); StorageContainerDatanodeProtocolClientSideTranslatorPB scm = mock( @@ -227,8 +232,7 @@ public class TestHeartbeatEndpointTask { .getDatanodeDetails().getUuid()) .build()); - HeartbeatEndpointTask endpointTask = getHeartbeatEndpointTask( - conf, context, scm); + HeartbeatEndpointTask endpointTask = getHeartbeatEndpointTask(context, scm); context.addEndpoint(TEST_SCM_ENDPOINT); context.addIncrementalReport( CommandStatusReportsProto.getDefaultInstance()); @@ -243,9 +247,8 @@ public class TestHeartbeatEndpointTask { @Test public void testheartbeatWithContainerActions() throws Exception { - OzoneConfiguration conf = new OzoneConfiguration(); StateContext context = new StateContext(conf, DatanodeStates.RUNNING, - mock(DatanodeStateMachine.class), ""); + datanodeStateMachine, ""); StorageContainerDatanodeProtocolClientSideTranslatorPB scm = mock( @@ -260,8 +263,7 @@ public class TestHeartbeatEndpointTask { .getDatanodeDetails().getUuid()) .build()); - HeartbeatEndpointTask endpointTask = getHeartbeatEndpointTask( - conf, context, scm); + HeartbeatEndpointTask endpointTask = getHeartbeatEndpointTask(context, scm); context.addEndpoint(TEST_SCM_ENDPOINT); context.addContainerAction(getContainerAction()); endpointTask.call(); @@ -275,9 +277,6 @@ public class TestHeartbeatEndpointTask { @Test public void testheartbeatWithAllReports() throws Exception { - OzoneConfiguration conf = new OzoneConfiguration(); - DatanodeStateMachine datanodeStateMachine = - mock(DatanodeStateMachine.class); StateContext context = new StateContext(conf, DatanodeStates.RUNNING, datanodeStateMachine, ""); @@ -303,8 +302,7 @@ public class TestHeartbeatEndpointTask { .getDatanodeDetails().getUuid()) .build()); - HeartbeatEndpointTask endpointTask = getHeartbeatEndpointTask( - conf, context, scm); + HeartbeatEndpointTask endpointTask = getHeartbeatEndpointTask(context, scm); context.addEndpoint(TEST_SCM_ENDPOINT); context.refreshFullReport(NodeReportProto.getDefaultInstance()); context.refreshFullReport(ContainerReportsProto.getDefaultInstance()); @@ -326,20 +324,19 @@ public class TestHeartbeatEndpointTask { assertEquals(commands.get(queueCount.getCommand(i)).intValue(), queueCount.getCount(i)); } + assertTrue(heartbeat.hasDiskBalancerReport()); } /** * Creates HeartbeatEndpointTask with the given conf, context and * StorageContainerManager client side proxy. * - * @param conf Configuration * @param context StateContext * @param proxy StorageContainerDatanodeProtocolClientSideTranslatorPB * * @return HeartbeatEndpointTask */ private HeartbeatEndpointTask getHeartbeatEndpointTask( - ConfigurationSource conf, StateContext context, StorageContainerDatanodeProtocolClientSideTranslatorPB proxy) { DatanodeDetails datanodeDetails = DatanodeDetails.newBuilder() --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
