This is an automated email from the ASF dual-hosted git repository. yiyang0203 pushed a commit to branch HDDS-5713 in repository https://gitbox.apache.org/repos/asf/ozone.git
commit 9014a79df83946c8177a3ee26daab25d01fea519 Author: Symious <[email protected]> AuthorDate: Wed Sep 7 12:19:57 2022 +0800 HDDS-7155. [DiskBalancer] Create interface between SCM and DN (#3701) --- .../org/apache/hadoop/hdds/HddsConfigKeys.java | 4 + .../apache/hadoop/hdds/scm/client/ScmClient.java | 21 ++-- .../protocol/StorageContainerLocationProtocol.java | 17 +-- .../scm/storage/DiskBalancerConfiguration.java | 39 +++---- .../common/src/main/resources/ozone-default.xml | 8 ++ .../common/report/DiskBalancerReportPublisher.java | 66 ++++++++++++ .../common/report/ReportPublisherFactory.java | 3 + .../common/statemachine/DatanodeStateMachine.java | 2 + .../common/statemachine/StateContext.java | 13 +++ .../commandhandler/DiskBalancerCommandHandler.java | 117 +++++++++++++++++++++ .../states/endpoint/HeartbeatEndpointTask.java | 7 ++ .../ozone/container/ozoneimpl/OzoneContainer.java | 5 + .../protocol/commands/DiskBalancerCommand.java | 74 +++++++++++++ .../common/report/TestReportPublisher.java | 42 ++++++++ .../common/report/TestReportPublisherFactory.java | 12 +++ ...inerLocationProtocolClientSideTranslatorPB.java | 81 ++++++++++++-- .../src/main/proto/ScmAdminProtocol.proto | 20 ++++ .../interface-client/src/main/proto/hdds.proto | 10 +- .../proto/ScmServerDatanodeHeartbeatProtocol.proto | 17 +++ .../apache/hadoop/hdds/scm/events/SCMEvents.java | 9 ++ .../hadoop/hdds/scm/node/DiskBalancerManager.java | 109 +++++++++++++------ .../hdds/scm/node/DiskBalancerReportHandler.java | 65 ++++++++++++ .../hadoop/hdds/scm/node/DiskBalancerStatus.java | 2 - .../apache/hadoop/hdds/scm/node/NodeManager.java | 8 ++ .../hadoop/hdds/scm/node/SCMNodeManager.java | 17 +++ ...inerLocationProtocolServerSideTranslatorPB.java | 51 ++++++++- .../hdds/scm/server/SCMClientProtocolServer.java | 22 ++-- .../scm/server/SCMDatanodeHeartbeatDispatcher.java | 23 ++++ .../hdds/scm/server/StorageContainerManager.java | 5 + .../hadoop/hdds/scm/container/MockNodeManager.java | 51 ++++++++- .../hdds/scm/container/SimpleMockNodeManager.java | 5 + .../hdds/scm/node/TestDiskBalancerManager.java | 34 ++++++ .../testutils/ReplicationNodeManagerMock.java | 6 ++ .../hdds/scm/cli/ContainerOperationClient.java | 29 ++--- .../hadoop/ozone/scm/node/TestDiskBalancer.java | 4 + 35 files changed, 902 insertions(+), 96 deletions(-) diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/HddsConfigKeys.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/HddsConfigKeys.java index d0c31bf288..b3b5f103d1 100644 --- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/HddsConfigKeys.java +++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/HddsConfigKeys.java @@ -46,6 +46,10 @@ public final class HddsConfigKeys { "hdds.pipeline.report.interval"; public static final String HDDS_PIPELINE_REPORT_INTERVAL_DEFAULT = "60s"; + public static final String HDDS_DISK_BALANCER_REPORT_INTERVAL = + "hdds.disk.balancer.report.interval"; + public static final String HDDS_DISK_BALANCER_REPORT_INTERVAL_DEFAULT = + "60s"; public static final String HDDS_COMMAND_STATUS_REPORT_INTERVAL = "hdds.command.status.report.interval"; public static final String HDDS_COMMAND_STATUS_REPORT_INTERVAL_DEFAULT = diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/client/ScmClient.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/client/ScmClient.java index 478d08dff0..2fd91fbc43 100644 --- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/client/ScmClient.java +++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/client/ScmClient.java @@ -456,7 +456,8 @@ public interface ScmClient extends Closeable { String getMetrics(String query) throws IOException; /** - * Get DiskBalancer status. + * Get DiskBalancer report. + * REPORT shows the current volume density of datanodes. * @param count top datanodes that need balancing * @return List of DatanodeDiskBalancerInfo. * @throws IOException @@ -466,33 +467,39 @@ public interface ScmClient extends Closeable { /** * Get DiskBalancer status. + * STATUS shows the running status of DiskBalancer on datanodes. * @param hosts If hosts is not null, return status of hosts; If hosts is * null, return status of all datanodes in balancing. * @return List of DatanodeDiskBalancerInfo. * @throws IOException */ List<HddsProtos.DatanodeDiskBalancerInfoProto> getDiskBalancerStatus( - Optional<List<String>> hosts) throws IOException; + Optional<List<String>> hosts, + Optional<HddsProtos.DiskBalancerRunningStatus> runningStatus) + throws IOException; /** * Start DiskBalancer. */ - void startDiskBalancer( + List<DatanodeAdminError> startDiskBalancer( Optional<Double> threshold, - Optional<Double> bandwidth, + Optional<Long> bandwidthInMB, + Optional<Integer> parallelThread, Optional<List<String>> hosts) throws IOException; /** * Stop DiskBalancer. */ - void stopDiskBalancer(Optional<List<String>> hosts) throws IOException; + List<DatanodeAdminError> stopDiskBalancer(Optional<List<String>> hosts) + throws IOException; /** * Update DiskBalancer Configuration. */ - void updateDiskBalancerConfiguration( + List<DatanodeAdminError> updateDiskBalancerConfiguration( Optional<Double> threshold, - Optional<Double> bandwidth, + Optional<Long> bandwidth, + Optional<Integer> parallelThread, Optional<List<String>> hosts) throws IOException; } diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/protocol/StorageContainerLocationProtocol.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/protocol/StorageContainerLocationProtocol.java index 2a7969049d..651c9905d7 100644 --- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/protocol/StorageContainerLocationProtocol.java +++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/protocol/StorageContainerLocationProtocol.java @@ -484,26 +484,31 @@ public interface StorageContainerLocationProtocol extends Closeable { * Get DiskBalancer status. */ List<HddsProtos.DatanodeDiskBalancerInfoProto> getDiskBalancerStatus( - Optional<List<String>> hosts, int clientVersion) throws IOException; + Optional<List<String>> hosts, + Optional<HddsProtos.DiskBalancerRunningStatus> runningStatus, + int clientVersion) throws IOException; /** * Start DiskBalancer. */ - void startDiskBalancer( + List<DatanodeAdminError> startDiskBalancer( Optional<Double> threshold, - Optional<Double> bandwidth, + Optional<Long> bandwidthInMB, + Optional<Integer> parallelThread, Optional<List<String>> hosts) throws IOException; /** * Stop DiskBalancer. */ - void stopDiskBalancer(Optional<List<String>> hosts) throws IOException; + List<DatanodeAdminError> stopDiskBalancer(Optional<List<String>> hosts) + throws IOException; /** * Update DiskBalancer Configuration. */ - void updateDiskBalancerConfiguration( + List<DatanodeAdminError> updateDiskBalancerConfiguration( Optional<Double> threshold, - Optional<Double> bandwidth, + Optional<Long> bandwidthInMB, + Optional<Integer> parallelThread, Optional<List<String>> hosts) throws IOException; } diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/storage/DiskBalancerConfiguration.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/storage/DiskBalancerConfiguration.java index 54709e4c17..e460e0b6eb 100644 --- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/storage/DiskBalancerConfiguration.java +++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/storage/DiskBalancerConfiguration.java @@ -23,7 +23,7 @@ import org.apache.hadoop.hdds.conf.Config; import org.apache.hadoop.hdds.conf.ConfigGroup; import org.apache.hadoop.hdds.conf.ConfigTag; import org.apache.hadoop.hdds.conf.ConfigType; -import org.apache.hadoop.hdds.conf.OzoneConfiguration; +import org.apache.hadoop.hdds.conf.ConfigurationSource; import org.apache.hadoop.hdds.protocol.proto.HddsProtos; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -36,7 +36,7 @@ public final class DiskBalancerConfiguration { private static final Logger LOG = LoggerFactory.getLogger(DiskBalancerConfiguration.class); - @Config(key = "volume.density.threshold", type = ConfigType.AUTO, + @Config(key = "volume.density.threshold", type = ConfigType.DOUBLE, defaultValue = "10", tags = {ConfigTag.DISKBALANCER}, description = "Threshold is a percentage in the range of 0 to 100. A " + "datanode is considered balanced if for each volume, the " + @@ -45,12 +45,12 @@ public final class DiskBalancerConfiguration { " of the entire datanode) no more than the threshold.") private double threshold = 10d; - @Config(key = "max.disk.throughputInMBPerSec", type = ConfigType.AUTO, + @Config(key = "max.disk.throughputInMBPerSec", type = ConfigType.LONG, defaultValue = "10", tags = {ConfigTag.DISKBALANCER}, description = "The max balance speed.") - private double diskBandwidth = 10; + private long diskBandwidthInMB = 10; - @Config(key = "parallel.thread", type = ConfigType.AUTO, + @Config(key = "parallel.thread", type = ConfigType.INT, defaultValue = "5", tags = {ConfigTag.DISKBALANCER}, description = "The max parallel balance thread count.") private int parallelThread = 5; @@ -86,21 +86,21 @@ public final class DiskBalancerConfiguration { * * @return max disk bandwidth per second */ - public double getDiskBandwidth() { - return diskBandwidth; + public double getDiskBandwidthInMB() { + return diskBandwidthInMB; } /** * Sets the disk bandwidth value for Disk Balancer. * - * @param diskBandwidth the bandwidth to control balance speed + * @param diskBandwidthInMB the bandwidth to control balance speed */ - public void setDiskBandwidth(double diskBandwidth) { - if (diskBandwidth <= 0d) { + public void setDiskBandwidthInMB(long diskBandwidthInMB) { + if (diskBandwidthInMB <= 0L) { throw new IllegalArgumentException( - "diskBandwidth must be a value larger than 0."); + "diskBandwidthInMB must be a value larger than 0."); } - this.diskBandwidth = diskBandwidth; + this.diskBandwidthInMB = diskBandwidthInMB; } /** @@ -124,6 +124,7 @@ public final class DiskBalancerConfiguration { } this.parallelThread = parallelThread; } + @Override public String toString() { return String.format("Disk Balancer Configuration values:%n" + @@ -132,7 +133,7 @@ public final class DiskBalancerConfiguration { "%-50s %s%n" + "%-50s %s%n", "Key", "Value", - "Threshold", threshold, "Max disk bandwidth", diskBandwidth, + "Threshold", threshold, "Max disk bandwidth", diskBandwidthInMB, "Parallel Thread", parallelThread); } @@ -141,21 +142,21 @@ public final class DiskBalancerConfiguration { HddsProtos.DiskBalancerConfigurationProto.newBuilder(); builder.setThreshold(threshold) - .setDiskBandwidth(diskBandwidth) + .setDiskBandwidthInMB(diskBandwidthInMB) .setParallelThread(parallelThread); return builder; } - static DiskBalancerConfiguration fromProtobuf( + public static DiskBalancerConfiguration fromProtobuf( @Nonnull HddsProtos.DiskBalancerConfigurationProto proto, - @Nonnull OzoneConfiguration ozoneConfiguration) { + @Nonnull ConfigurationSource configurationSource) { DiskBalancerConfiguration config = - ozoneConfiguration.getObject(DiskBalancerConfiguration.class); + configurationSource.getObject(DiskBalancerConfiguration.class); if (proto.hasThreshold()) { config.setThreshold(proto.getThreshold()); } - if (proto.hasDiskBandwidth()) { - config.setDiskBandwidth(proto.getDiskBandwidth()); + if (proto.hasDiskBandwidthInMB()) { + config.setDiskBandwidthInMB(proto.getDiskBandwidthInMB()); } if (proto.hasParallelThread()) { config.setParallelThread(proto.getParallelThread()); diff --git a/hadoop-hdds/common/src/main/resources/ozone-default.xml b/hadoop-hdds/common/src/main/resources/ozone-default.xml index fc873f20af..e59e29257c 100644 --- a/hadoop-hdds/common/src/main/resources/ozone-default.xml +++ b/hadoop-hdds/common/src/main/resources/ozone-default.xml @@ -386,6 +386,14 @@ datanode periodically send pipeline report to SCM. Unit could be defined with postfix (ns,ms,s,m,h,d)</description> </property> + <property> + <name>hdds.disk.balancer.report.interval</name> + <value>60000ms</value> + <tag>OZONE, CONTAINER, DISK_BALANCER</tag> + <description>Time interval of the datanode to send disk balancer report. Each + datanode periodically sends disk balancer report to SCM. Unit could be + defined with postfix (ns,ms,s,m,h,d)</description> + </property> <property> diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/report/DiskBalancerReportPublisher.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/report/DiskBalancerReportPublisher.java new file mode 100644 index 0000000000..2bb78c1ee6 --- /dev/null +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/report/DiskBalancerReportPublisher.java @@ -0,0 +1,66 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ + +package org.apache.hadoop.ozone.container.common.report; + +import com.google.common.base.Preconditions; +import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.DiskBalancerReportProto; +import org.apache.hadoop.hdds.utils.HddsServerUtil; + +import java.util.concurrent.TimeUnit; + +import static org.apache.hadoop.hdds.HddsConfigKeys.HDDS_DISK_BALANCER_REPORT_INTERVAL; +import static org.apache.hadoop.hdds.HddsConfigKeys.HDDS_DISK_BALANCER_REPORT_INTERVAL_DEFAULT; + + +/** + * Publishes DiskBalancer report which will be sent to SCM as part of heartbeat. + * DiskBalancer Report consist of the following information: + * - isBalancerRunning + * - balancedBytes + * - DiskBalancerConfiguration + */ +public class DiskBalancerReportPublisher extends + ReportPublisher<DiskBalancerReportProto> { + + private Long diskBalancerReportInterval = null; + + @Override + protected long getReportFrequency() { + if (diskBalancerReportInterval == null) { + diskBalancerReportInterval = getConf().getTimeDuration( + HDDS_DISK_BALANCER_REPORT_INTERVAL, + HDDS_DISK_BALANCER_REPORT_INTERVAL_DEFAULT, + TimeUnit.MILLISECONDS); + + long heartbeatFrequency = HddsServerUtil.getScmHeartbeatInterval( + getConf()); + + Preconditions.checkState( + heartbeatFrequency <= diskBalancerReportInterval, + HDDS_DISK_BALANCER_REPORT_INTERVAL + + " cannot be configured lower than heartbeat frequency " + + heartbeatFrequency + "."); + } + return diskBalancerReportInterval; + } + + @Override + protected DiskBalancerReportProto getReport() { + return getContext().getParent().getContainer().getDiskBalancerReport(); + } +} diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/report/ReportPublisherFactory.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/report/ReportPublisherFactory.java index 3be1b5e077..19d2806919 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/report/ReportPublisherFactory.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/report/ReportPublisherFactory.java @@ -24,6 +24,7 @@ import org.apache.hadoop.hdds.conf.ConfigurationSource; import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.CRLStatusReport; import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.CommandStatusReportsProto; import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.ContainerReportsProto; +import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.DiskBalancerReportProto; import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.NodeReportProto; import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.PipelineReportsProto; @@ -55,6 +56,8 @@ public class ReportPublisherFactory { report2publisher.put(PipelineReportsProto.class, PipelineReportPublisher.class); report2publisher.put(CRLStatusReport.class, CRLStatusReportPublisher.class); + report2publisher.put(DiskBalancerReportProto.class, + DiskBalancerReportPublisher.class); } /** diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/DatanodeStateMachine.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/DatanodeStateMachine.java index 9677144054..1623c1ae28 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/DatanodeStateMachine.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/DatanodeStateMachine.java @@ -55,6 +55,7 @@ import org.apache.hadoop.ozone.container.common.statemachine.commandhandler.Comm import org.apache.hadoop.ozone.container.common.statemachine.commandhandler.CreatePipelineCommandHandler; import org.apache.hadoop.ozone.container.common.statemachine.commandhandler.DeleteBlocksCommandHandler; import org.apache.hadoop.ozone.container.common.statemachine.commandhandler.DeleteContainerCommandHandler; +import org.apache.hadoop.ozone.container.common.statemachine.commandhandler.DiskBalancerCommandHandler; import org.apache.hadoop.ozone.container.common.statemachine.commandhandler.FinalizeNewLayoutVersionCommandHandler; import org.apache.hadoop.ozone.container.common.statemachine.commandhandler.ReconstructECContainersCommandHandler; import org.apache.hadoop.ozone.container.common.statemachine.commandhandler.RefreshVolumeUsageCommandHandler; @@ -258,6 +259,7 @@ public class DatanodeStateMachine implements Closeable { supervisor::nodeStateUpdated)) .addHandler(new FinalizeNewLayoutVersionCommandHandler()) .addHandler(new RefreshVolumeUsageCommandHandler()) + .addHandler(new DiskBalancerCommandHandler()) .setConnectionManager(connectionManager) .setContainer(container) .setContext(context) diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/StateContext.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/StateContext.java index 6bbf8e4794..70fafa2c84 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/StateContext.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/StateContext.java @@ -50,6 +50,7 @@ import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolPro import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.CommandStatusReportsProto; import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.ContainerAction; import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.ContainerReportsProto; +import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.DiskBalancerReportProto; import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.IncrementalContainerReportProto; import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.NodeReportProto; import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.PipelineAction; @@ -98,6 +99,9 @@ public class StateContext { @VisibleForTesting static final String CRL_STATUS_REPORT_PROTO_NAME = CRLStatusReport.getDescriptor().getFullName(); + @VisibleForTesting + static final String DISK_BALANCER_REPORT_PROTO_NAME = + DiskBalancerReportProto.getDescriptor().getFullName(); static final Logger LOG = LoggerFactory.getLogger(StateContext.class); @@ -113,6 +117,7 @@ public class StateContext { private final AtomicReference<Message> nodeReport; private final AtomicReference<Message> pipelineReports; private final AtomicReference<Message> crlStatusReport; + private final AtomicReference<Message> diskBalancerReport; // Incremental reports are queued in the map below private final Map<InetSocketAddress, List<Message>> incrementalReportsQueue; @@ -180,6 +185,7 @@ public class StateContext { nodeReport = new AtomicReference<>(); pipelineReports = new AtomicReference<>(); crlStatusReport = new AtomicReference<>(); // Certificate Revocation List + diskBalancerReport = new AtomicReference<>(); endpoints = new HashSet<>(); containerActions = new HashMap<>(); pipelineActions = new HashMap<>(); @@ -206,6 +212,8 @@ public class StateContext { type2Reports.put(PIPELINE_REPORTS_PROTO_NAME, pipelineReports); fullReportTypeList.add(CRL_STATUS_REPORT_PROTO_NAME); type2Reports.put(CRL_STATUS_REPORT_PROTO_NAME, crlStatusReport); + fullReportTypeList.add(DISK_BALANCER_REPORT_PROTO_NAME); + type2Reports.put(DISK_BALANCER_REPORT_PROTO_NAME, diskBalancerReport); } /** @@ -958,6 +966,11 @@ public class StateContext { return crlStatusReport.get(); } + @VisibleForTesting + public Message getDiskBalancerReport() { + return diskBalancerReport.get(); + } + public void configureReconHeartbeatFrequency() { reconHeartbeatFrequency.set(getReconHeartbeatInterval(conf)); } diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/DiskBalancerCommandHandler.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/DiskBalancerCommandHandler.java new file mode 100644 index 0000000000..37f275dbb4 --- /dev/null +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/DiskBalancerCommandHandler.java @@ -0,0 +1,117 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ +package org.apache.hadoop.ozone.container.common.statemachine.commandhandler; + +import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.SCMCommandProto; +import org.apache.hadoop.hdds.scm.storage.DiskBalancerConfiguration; +import org.apache.hadoop.ozone.container.common.statemachine.SCMConnectionManager; +import org.apache.hadoop.ozone.container.common.statemachine.StateContext; +import org.apache.hadoop.ozone.container.ozoneimpl.OzoneContainer; +import org.apache.hadoop.ozone.protocol.commands.SCMCommand; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.concurrent.atomic.AtomicLong; + +/** + * Handler for DiskBalancer command received from SCM. + */ +public class DiskBalancerCommandHandler implements CommandHandler { + + private static final Logger LOG = + LoggerFactory.getLogger(DiskBalancerCommandHandler.class); + + private AtomicLong invocationCount = new AtomicLong(0); + private long totalTime; + + /** + * Constructs a diskBalancerCommand handler. + */ + public DiskBalancerCommandHandler() { + } + + /** + * Handles a given SCM command. + * + * @param command - SCM Command + * @param ozoneContainer - Ozone Container. + * @param context - Current Context. + * @param connectionManager - The SCMs that we are talking to. + */ + @Override + public void handle(SCMCommand command, OzoneContainer ozoneContainer, + StateContext context, SCMConnectionManager connectionManager) { + invocationCount.incrementAndGet(); + // TODO: Do start/stop/update operation + } + + /** + * Returns the command type that this command handler handles. + * + * @return Type + */ + @Override + public SCMCommandProto.Type getCommandType() { + return SCMCommandProto.Type.diskBalancerCommand; + } + + /** + * Returns number of times this handler has been invoked. + * + * @return int + */ + @Override + public int getInvocationCount() { + return (int)invocationCount.get(); + } + + /** + * Returns the average time this function takes to run. + * + * @return long + */ + @Override + public long getAverageRunTime() { + if (invocationCount.get() > 0) { + return totalTime / invocationCount.get(); + } + return 0; + } + + @Override + public long getTotalRunTime() { + return totalTime; + } + + @Override + public int getQueuedCount() { + return 0; + } + + private void startDiskBalancer(DiskBalancerConfiguration configuration) { + // Todo: add implementation to start DiskBalancer + } + + private void stopDiskBalancer() { + // Todo: add implementation to stop DiskBalancer + } + + private void updateDiskBalancer(DiskBalancerConfiguration + configuration) { + // Todo: add implementation to update diskBalancer configuration + } +} diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/states/endpoint/HeartbeatEndpointTask.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/states/endpoint/HeartbeatEndpointTask.java index 44f0eae49e..0685650361 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/states/endpoint/HeartbeatEndpointTask.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/states/endpoint/HeartbeatEndpointTask.java @@ -54,6 +54,7 @@ import org.apache.hadoop.ozone.protocol.commands.ClosePipelineCommand; import org.apache.hadoop.ozone.protocol.commands.CreatePipelineCommand; import org.apache.hadoop.ozone.protocol.commands.DeleteBlocksCommand; import org.apache.hadoop.ozone.protocol.commands.DeleteContainerCommand; +import org.apache.hadoop.ozone.protocol.commands.DiskBalancerCommand; import org.apache.hadoop.ozone.protocol.commands.FinalizeNewLayoutVersionCommand; import org.apache.hadoop.ozone.protocol.commands.ReconstructECContainersCommand; import org.apache.hadoop.ozone.protocol.commands.RefreshVolumeUsageCommand; @@ -416,6 +417,12 @@ public class HeartbeatEndpointTask commandResponseProto.getRefreshVolumeUsageCommandProto()); processCommonCommand(commandResponseProto, refreshVolumeUsageCommand); break; + case diskBalancerCommand: + DiskBalancerCommand diskBalancerCommand = + DiskBalancerCommand.getFromProtobuf( + commandResponseProto.getDiskBalancerCommandProto(), conf); + processCommonCommand(commandResponseProto, diskBalancerCommand); + break; default: throw new IllegalArgumentException("Unknown response : " + commandResponseProto.getCommandType().name()); diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ozoneimpl/OzoneContainer.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ozoneimpl/OzoneContainer.java index aef3965dcd..f81b58e317 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ozoneimpl/OzoneContainer.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ozoneimpl/OzoneContainer.java @@ -27,6 +27,7 @@ import org.apache.hadoop.hdds.protocol.DatanodeDetails.Port.Name; import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ContainerType; import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos; import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.ContainerReplicaProto; +import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.DiskBalancerReportProto; import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.IncrementalContainerReportProto; import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.PipelineReportsProto; import org.apache.hadoop.hdds.security.SecurityConfig; @@ -590,4 +591,8 @@ public class OzoneContainer { return replicationServer; } + public DiskBalancerReportProto getDiskBalancerReport() { + // TODO: Return real disk balancer report + return null; + } } diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/protocol/commands/DiskBalancerCommand.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/protocol/commands/DiskBalancerCommand.java new file mode 100644 index 0000000000..ea97f34ca0 --- /dev/null +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/protocol/commands/DiskBalancerCommand.java @@ -0,0 +1,74 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.ozone.protocol.commands; + +import com.google.common.base.Preconditions; +import org.apache.hadoop.hdds.conf.ConfigurationSource; +import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.SCMCommandProto; +import org.apache.hadoop.hdds.scm.storage.DiskBalancerConfiguration; + +import static org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.DiskBalancerCommandProto; + +/** + * Informs a datanode to update DiskBalancer status. + */ +public class DiskBalancerCommand extends SCMCommand<DiskBalancerCommandProto> { + + private final boolean shouldRun; + private final DiskBalancerConfiguration diskBalancerConfiguration; + + public DiskBalancerCommand(final boolean shouldRun, + final DiskBalancerConfiguration diskBalancerConfiguration) { + this.shouldRun = shouldRun; + this.diskBalancerConfiguration = diskBalancerConfiguration; + } + + /** + * Returns the type of this command. + * + * @return Type + */ + @Override + public SCMCommandProto.Type getType() { + return SCMCommandProto.Type.diskBalancerCommand; + } + + @Override + public DiskBalancerCommandProto getProto() { + return DiskBalancerCommandProto.newBuilder() + .setShouldRun(shouldRun) + .setDiskBalancerConf(diskBalancerConfiguration.toProtobufBuilder()) + .build(); + } + + public static DiskBalancerCommand getFromProtobuf(DiskBalancerCommandProto + diskbalancerCommandProto, ConfigurationSource configuration) { + Preconditions.checkNotNull(diskbalancerCommandProto); + return new DiskBalancerCommand(diskbalancerCommandProto.getShouldRun(), + DiskBalancerConfiguration.fromProtobuf( + diskbalancerCommandProto.getDiskBalancerConf(), configuration)); + } + + public boolean isShouldRun() { + return shouldRun; + } + + public DiskBalancerConfiguration getDiskBalancerConfiguration() { + return diskBalancerConfiguration; + } +} diff --git a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/report/TestReportPublisher.java b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/report/TestReportPublisher.java index 60404fa36b..2f38e87fed 100644 --- a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/report/TestReportPublisher.java +++ b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/report/TestReportPublisher.java @@ -31,7 +31,9 @@ import org.apache.hadoop.hdds.HddsIdFactory; import org.apache.hadoop.hdds.conf.OzoneConfiguration; import org.apache.hadoop.hdds.datanode.metadata.DatanodeCRLStore; import org.apache.hadoop.hdds.protocol.DatanodeDetails; +import org.apache.hadoop.hdds.protocol.proto.HddsProtos; import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.CRLStatusReport; +import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.DiskBalancerReportProto; import org.apache.hadoop.hdds.protocol.proto. StorageContainerDatanodeProtocolProtos.CommandStatus.Status; import org.apache.hadoop.hdds.protocol.proto. @@ -39,6 +41,7 @@ import org.apache.hadoop.hdds.protocol.proto. import org.apache.hadoop.hdds.security.x509.crl.CRLInfo; import org.apache.hadoop.ozone.container.common.statemachine.DatanodeStateMachine; import org.apache.hadoop.ozone.container.common.statemachine.StateContext; +import org.apache.hadoop.ozone.container.ozoneimpl.OzoneContainer; import org.apache.hadoop.ozone.protocol.commands.CommandStatus; import org.apache.hadoop.util.concurrent.HadoopExecutors; import org.junit.jupiter.api.BeforeAll; @@ -217,6 +220,45 @@ public class TestReportPublisher { executorService.shutdown(); } + @Test + public void testDiskBalancerReportPublisher() throws IOException { + StateContext dummyContext = mock(StateContext.class); + DatanodeStateMachine dummyStateMachine = + mock(DatanodeStateMachine.class); + OzoneContainer dummyContainer = mock(OzoneContainer.class); + + DiskBalancerReportProto.Builder builder = + DiskBalancerReportProto.newBuilder(); + builder.setIsRunning(true); + builder.setBalancedBytes(1L); + builder.setDiskBalancerConf( + HddsProtos.DiskBalancerConfigurationProto.newBuilder().build()); + DiskBalancerReportProto dummyReport = builder.build(); + + ReportPublisher publisher = new DiskBalancerReportPublisher(); + when(dummyContext.getParent()).thenReturn(dummyStateMachine); + when(dummyStateMachine.getContainer()).thenReturn(dummyContainer); + when(dummyContainer.getDiskBalancerReport()).thenReturn(dummyReport); + publisher.setConf(config); + + ScheduledExecutorService executorService = HadoopExecutors + .newScheduledThreadPool(1, + new ThreadFactoryBuilder().setDaemon(true) + .setNameFormat("Unit test ReportManager Thread - %d").build()); + publisher.init(dummyContext, executorService); + Message report = + ((DiskBalancerReportPublisher) publisher).getReport(); + assertNotNull(report); + for (Descriptors.FieldDescriptor descriptor : + report.getDescriptorForType().getFields()) { + if (descriptor.getNumber() == + DiskBalancerReportProto.ISRUNNING_FIELD_NUMBER) { + assertEquals(true, report.getField(descriptor)); + } + } + executorService.shutdown(); + } + /** * Get a datanode details. * diff --git a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/report/TestReportPublisherFactory.java b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/report/TestReportPublisherFactory.java index 44e1389bf7..1ebb05218d 100644 --- a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/report/TestReportPublisherFactory.java +++ b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/report/TestReportPublisherFactory.java @@ -22,6 +22,7 @@ import org.apache.hadoop.hdds.protocol.proto.HddsProtos; import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.ContainerReportsProto; import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.NodeReportProto; import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.CRLStatusReport; +import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.DiskBalancerReportProto; import org.junit.jupiter.api.Test; @@ -66,6 +67,17 @@ public class TestReportPublisherFactory { assertEquals(conf, publisher.getConf()); } + @Test + public void testGetDiskBalancerReportPublisher() { + OzoneConfiguration conf = new OzoneConfiguration(); + ReportPublisherFactory factory = new ReportPublisherFactory(conf); + ReportPublisher publisher = factory + .getPublisherFor(DiskBalancerReportProto.class); + assertEquals(DiskBalancerReportPublisher.class, + publisher.getClass()); + assertEquals(conf, publisher.getConf()); + } + @Test public void testInvalidReportPublisher() { OzoneConfiguration conf = new OzoneConfiguration(); diff --git a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/scm/protocolPB/StorageContainerLocationProtocolClientSideTranslatorPB.java b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/scm/protocolPB/StorageContainerLocationProtocolClientSideTranslatorPB.java index c1a48cf18f..91e6f5578d 100644 --- a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/scm/protocolPB/StorageContainerLocationProtocolClientSideTranslatorPB.java +++ b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/scm/protocolPB/StorageContainerLocationProtocolClientSideTranslatorPB.java @@ -50,6 +50,9 @@ import org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolPro import org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.DatanodeDiskBalancerInfoType; import org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.DatanodeDiskBalancerInfoRequestProto; import org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.DatanodeDiskBalancerInfoResponseProto; +import org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.DatanodeDiskBalancerOpType; +import org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.DatanodeDiskBalancerOpRequestProto; +import org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.DatanodeDiskBalancerOpResponseProto; import org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.DeactivatePipelineRequestProto; import org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.DecommissionNodesRequestProto; import org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.DecommissionNodesResponseProto; @@ -1135,11 +1138,14 @@ public final class StorageContainerLocationProtocolClientSideTranslatorPB @Override public List<HddsProtos.DatanodeDiskBalancerInfoProto> getDiskBalancerStatus( - Optional<List<String>> hosts, int clientVersion) throws IOException { + Optional<List<String>> hosts, + Optional<HddsProtos.DiskBalancerRunningStatus> status, + int clientVersion) throws IOException { DatanodeDiskBalancerInfoRequestProto.Builder requestBuilder = DatanodeDiskBalancerInfoRequestProto.newBuilder() .setInfoType(DatanodeDiskBalancerInfoType.status); hosts.ifPresent(requestBuilder::addAllHosts); + status.ifPresent(requestBuilder::setStatus); DatanodeDiskBalancerInfoRequestProto request = requestBuilder.build(); DatanodeDiskBalancerInfoResponseProto response = @@ -1151,22 +1157,83 @@ public final class StorageContainerLocationProtocolClientSideTranslatorPB } @Override - public void startDiskBalancer(Optional<Double> threshold, - Optional<Double> bandwidth, Optional<List<String>> hosts) - throws IOException { + public List<DatanodeAdminError> startDiskBalancer(Optional<Double> threshold, + Optional<Long> bandwidthInMB, Optional<Integer> parallelThread, + Optional<List<String>> hosts) throws IOException { + HddsProtos.DiskBalancerConfigurationProto.Builder confBuilder = + HddsProtos.DiskBalancerConfigurationProto.newBuilder(); + threshold.ifPresent(confBuilder::setThreshold); + bandwidthInMB.ifPresent(confBuilder::setDiskBandwidthInMB); + parallelThread.ifPresent(confBuilder::setParallelThread); + + DatanodeDiskBalancerOpRequestProto.Builder requestBuilder = + DatanodeDiskBalancerOpRequestProto.newBuilder() + .setOpType(DatanodeDiskBalancerOpType.start) + .setConf(confBuilder); + hosts.ifPresent(requestBuilder::addAllHosts); + DatanodeDiskBalancerOpResponseProto response = + submitRequest(Type.DatanodeDiskBalancerOp, + builder -> builder.setDatanodeDiskBalancerOpRequest( + requestBuilder.build())) + .getDatanodeDiskBalancerOpResponse(); + + List<DatanodeAdminError> errors = new ArrayList<>(); + for (DatanodeAdminErrorResponseProto e : response.getFailedHostsList()) { + errors.add(new DatanodeAdminError(e.getHost(), e.getError())); + } + return errors; } @Override - public void stopDiskBalancer(Optional<List<String>> hosts) + public List<DatanodeAdminError> stopDiskBalancer(Optional<List<String>> hosts) throws IOException { + DatanodeDiskBalancerOpRequestProto.Builder requestBuilder = + DatanodeDiskBalancerOpRequestProto.newBuilder() + .setOpType(DatanodeDiskBalancerOpType.stop); + hosts.ifPresent(requestBuilder::addAllHosts); + + DatanodeDiskBalancerOpResponseProto response = + submitRequest(Type.DatanodeDiskBalancerOp, + builder -> builder.setDatanodeDiskBalancerOpRequest( + requestBuilder.build())) + .getDatanodeDiskBalancerOpResponse(); + List<DatanodeAdminError> errors = new ArrayList<>(); + for (DatanodeAdminErrorResponseProto e : response.getFailedHostsList()) { + errors.add(new DatanodeAdminError(e.getHost(), e.getError())); + } + return errors; } @Override - public void updateDiskBalancerConfiguration(Optional<Double> threshold, - Optional<Double> bandwidth, Optional<List<String>> hosts) + public List<DatanodeAdminError> updateDiskBalancerConfiguration( + Optional<Double> threshold, Optional<Long> bandwidthInMB, + Optional<Integer> parallelThread, Optional<List<String>> hosts) throws IOException { + HddsProtos.DiskBalancerConfigurationProto.Builder confBuilder = + HddsProtos.DiskBalancerConfigurationProto.newBuilder(); + threshold.ifPresent(confBuilder::setThreshold); + bandwidthInMB.ifPresent(confBuilder::setDiskBandwidthInMB); + parallelThread.ifPresent(confBuilder::setParallelThread); + + DatanodeDiskBalancerOpRequestProto.Builder requestBuilder = + DatanodeDiskBalancerOpRequestProto.newBuilder() + .setOpType(DatanodeDiskBalancerOpType.update) + .setConf(confBuilder); + hosts.ifPresent(requestBuilder::addAllHosts); + + DatanodeDiskBalancerOpResponseProto response = + submitRequest(Type.DatanodeDiskBalancerOp, + builder -> builder.setDatanodeDiskBalancerOpRequest( + requestBuilder.build())) + .getDatanodeDiskBalancerOpResponse(); + + List<DatanodeAdminError> errors = new ArrayList<>(); + for (DatanodeAdminErrorResponseProto e : response.getFailedHostsList()) { + errors.add(new DatanodeAdminError(e.getHost(), e.getError())); + } + return errors; } @Override diff --git a/hadoop-hdds/interface-admin/src/main/proto/ScmAdminProtocol.proto b/hadoop-hdds/interface-admin/src/main/proto/ScmAdminProtocol.proto index 4730a4ccc7..fca2232921 100644 --- a/hadoop-hdds/interface-admin/src/main/proto/ScmAdminProtocol.proto +++ b/hadoop-hdds/interface-admin/src/main/proto/ScmAdminProtocol.proto @@ -85,6 +85,7 @@ message ScmContainerLocationRequest { optional GetContainersOnDecomNodeRequestProto getContainersOnDecomNodeRequest = 46; optional GetMetricsRequestProto getMetricsRequest = 47; optional DatanodeDiskBalancerInfoRequestProto DatanodeDiskBalancerInfoRequest = 48; + optional DatanodeDiskBalancerOpRequestProto datanodeDiskBalancerOpRequest = 49; } message ScmContainerLocationResponse { @@ -141,6 +142,7 @@ message ScmContainerLocationResponse { optional GetContainersOnDecomNodeResponseProto getContainersOnDecomNodeResponse = 46; optional GetMetricsResponseProto getMetricsResponse = 47; optional DatanodeDiskBalancerInfoResponseProto DatanodeDiskBalancerInfoResponse = 48; + optional DatanodeDiskBalancerOpResponseProto datanodeDiskBalancerOpResponse = 49; enum Status { OK = 1; @@ -196,6 +198,7 @@ enum Type { GetContainersOnDecomNode = 42; GetMetrics = 43; DatanodeDiskBalancerInfo = 44; + DatanodeDiskBalancerOp = 45; } /** @@ -372,12 +375,29 @@ message DatanodeDiskBalancerInfoRequestProto { required DatanodeDiskBalancerInfoType infoType = 1; optional uint32 count = 2; repeated string hosts = 3; + optional DiskBalancerRunningStatus status = 4; } message DatanodeDiskBalancerInfoResponseProto { repeated DatanodeDiskBalancerInfoProto info = 1; } +enum DatanodeDiskBalancerOpType{ + start = 1; + stop = 2; + update = 3; +} + +message DatanodeDiskBalancerOpRequestProto { + required DatanodeDiskBalancerOpType opType = 1; + repeated string hosts = 2; + optional DiskBalancerConfigurationProto conf = 3; +} + +message DatanodeDiskBalancerOpResponseProto { + repeated DatanodeAdminErrorResponseProto failedHosts = 1; +} + /* Decommission a list of hosts */ diff --git a/hadoop-hdds/interface-client/src/main/proto/hdds.proto b/hadoop-hdds/interface-client/src/main/proto/hdds.proto index 803842ed2d..a53d035109 100644 --- a/hadoop-hdds/interface-client/src/main/proto/hdds.proto +++ b/hadoop-hdds/interface-client/src/main/proto/hdds.proto @@ -524,13 +524,19 @@ message InnerNode { message DiskBalancerConfigurationProto { optional double threshold = 1; - optional double diskBandwidth = 2; + optional uint64 diskBandwidthInMB = 2; optional int32 parallelThread = 3; } +enum DiskBalancerRunningStatus { + RUNNING = 1; + STOPPED = 2; + UNKNOWN = 3; +} + message DatanodeDiskBalancerInfoProto { required DatanodeDetailsProto node = 1; required double currentVolumeDensitySum = 2; - optional bool diskBalancerRunning = 3; + optional DiskBalancerRunningStatus runningStatus = 3; optional DiskBalancerConfigurationProto diskBalancerConf = 4; } diff --git a/hadoop-hdds/interface-server/src/main/proto/ScmServerDatanodeHeartbeatProtocol.proto b/hadoop-hdds/interface-server/src/main/proto/ScmServerDatanodeHeartbeatProtocol.proto index 2994073c02..7a265dd0ba 100644 --- a/hadoop-hdds/interface-server/src/main/proto/ScmServerDatanodeHeartbeatProtocol.proto +++ b/hadoop-hdds/interface-server/src/main/proto/ScmServerDatanodeHeartbeatProtocol.proto @@ -139,6 +139,7 @@ message SCMHeartbeatRequestProto { optional PipelineReportsProto pipelineReports = 8; optional LayoutVersionProto dataNodeLayoutVersion = 9; optional CommandQueueReportProto commandQueueReport = 10; + optional DiskBalancerReportProto diskBalancerReport = 11; } message CommandQueueReportProto { @@ -328,6 +329,7 @@ message SCMCommandProto { finalizeNewLayoutVersionCommand = 9; refreshVolumeUsageInfo = 10; reconstructECContainersCommand = 11; + diskBalancerCommand = 12; } // TODO: once we start using protoc 3.x, refactor this message using "oneof" required Type commandType = 1; @@ -343,6 +345,7 @@ message SCMCommandProto { finalizeNewLayoutVersionCommandProto = 10; optional RefreshVolumeUsageCommandProto refreshVolumeUsageCommandProto = 11; optional ReconstructECContainersCommandProto reconstructECContainersCommandProto = 12; + optional DiskBalancerCommandProto diskBalancerCommandProto = 13; // If running upon Ratis, holds term of underlying RaftServer iff current @@ -445,6 +448,14 @@ message DatanodeDetailsAndReplicaIndexProto { required int32 replicaIndex = 2; } +/** +This command asks the datanode to update diskBalancer status + */ +message DiskBalancerCommandProto { + required bool shouldRun = 1; + optional DiskBalancerConfigurationProto diskBalancerConf = 2; +} + /** This command asks the datanode to create a pipeline. */ @@ -483,6 +494,12 @@ message CRLStatusReport { repeated int64 pendingCrlIds=2; } +message DiskBalancerReportProto { + required bool isRunning = 1; + optional uint64 balancedBytes = 2; + optional DiskBalancerConfigurationProto diskBalancerConf = 3; +} + /** * This command asks the datanode to process a new CRL. */ diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/events/SCMEvents.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/events/SCMEvents.java index 0cc205b2ff..d22a878249 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/events/SCMEvents.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/events/SCMEvents.java @@ -27,6 +27,7 @@ import org.apache.hadoop.hdds.scm.server.SCMDatanodeHeartbeatDispatcher.CRLStatu import org.apache.hadoop.hdds.scm.server.SCMDatanodeHeartbeatDispatcher.CommandStatusReportFromDatanode; import org.apache.hadoop.hdds.scm.server.SCMDatanodeHeartbeatDispatcher.ContainerActionsFromDatanode; import org.apache.hadoop.hdds.scm.server.SCMDatanodeHeartbeatDispatcher.ContainerReportFromDatanode; +import org.apache.hadoop.hdds.scm.server.SCMDatanodeHeartbeatDispatcher.DiskBalancerReportFromDatanode; import org.apache.hadoop.hdds.scm.server.SCMDatanodeHeartbeatDispatcher.IncrementalContainerReportFromDatanode; import org.apache.hadoop.hdds.scm.server.SCMDatanodeHeartbeatDispatcher.NodeReportFromDatanode; import org.apache.hadoop.hdds.scm.server.SCMDatanodeHeartbeatDispatcher.PipelineActionsFromDatanode; @@ -217,6 +218,14 @@ public final class SCMEvents { new TypedEvent<>(CRLStatusReportFromDatanode.class, "Crl_Status_Report"); + /** + * DiskBalancer reports are send out by Datanodes. This report is received by + * SCMDatanodeHeartbeatDispatcher and DiskBalancer_Report Event is generated. + */ + public static final TypedEvent<DiskBalancerReportFromDatanode> + DISK_BALANCER_REPORT = new TypedEvent<>( + DiskBalancerReportFromDatanode.class, "DiskBalancer_Report"); + /** * Private Ctor. Never Constructed. */ diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/DiskBalancerManager.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/DiskBalancerManager.java index 0b16c1aa95..c6114c52f5 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/DiskBalancerManager.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/DiskBalancerManager.java @@ -22,7 +22,8 @@ import com.google.common.base.Preconditions; import org.apache.hadoop.hdds.conf.OzoneConfiguration; import org.apache.hadoop.hdds.protocol.DatanodeDetails; import org.apache.hadoop.hdds.protocol.proto.HddsProtos; -import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos; +import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.DiskBalancerReportProto; +import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.StorageReportProto; import org.apache.hadoop.hdds.scm.ha.SCMContext; import org.apache.hadoop.hdds.scm.storage.DiskBalancerConfiguration; import org.apache.hadoop.hdds.server.events.EventPublisher; @@ -47,10 +48,13 @@ public class DiskBalancerManager { public static final Logger LOG = LoggerFactory.getLogger(DiskBalancerManager.class); + + private OzoneConfiguration conf; private final EventPublisher scmNodeEventPublisher; private final SCMContext scmContext; private final NodeManager nodeManager; private Map<DatanodeDetails, DiskBalancerStatus> statusMap; + private Map<DatanodeDetails, Long> balancedBytesMap; private boolean useHostnames; /** @@ -60,6 +64,7 @@ public class DiskBalancerManager { EventPublisher eventPublisher, SCMContext scmContext, NodeManager nodeManager) { + this.conf = conf; this.scmNodeEventPublisher = eventPublisher; this.scmContext = scmContext; this.nodeManager = nodeManager; @@ -95,42 +100,60 @@ public class DiskBalancerManager { * If hosts is null, return status of all datanodes in balancing. */ public List<HddsProtos.DatanodeDiskBalancerInfoProto> getDiskBalancerStatus( - Optional<List<String>> hosts, int clientVersion) throws IOException { - List<HddsProtos.DatanodeDiskBalancerInfoProto> statusList = - new ArrayList<>(); + Optional<List<String>> hosts, + Optional<HddsProtos.DiskBalancerRunningStatus> status, + int clientVersion) throws IOException { List<DatanodeDetails> filterDns = null; if (hosts.isPresent() && !hosts.get().isEmpty()) { filterDns = NodeUtils.mapHostnamesToDatanodes(nodeManager, hosts.get(), useHostnames); } - for (DatanodeDetails datanodeDetails: nodeManager.getNodes(IN_SERVICE, - HddsProtos.NodeState.HEALTHY)) { - if (shouldReturnDatanode(filterDns, datanodeDetails)) { - double volumeDensitySum = - getVolumeDataDensitySumForDatanodeDetails(datanodeDetails); - statusList.add(HddsProtos.DatanodeDiskBalancerInfoProto.newBuilder() - .setCurrentVolumeDensitySum(volumeDensitySum) - .setDiskBalancerRunning(isRunning(datanodeDetails)) - .setDiskBalancerConf(statusMap.getOrDefault(datanodeDetails, - DiskBalancerStatus.DUMMY_STATUS) - .getDiskBalancerConfiguration().toProtobufBuilder()) - .setNode(datanodeDetails.toProto(clientVersion)) - .build()); - } + // Filter Running Status by default + HddsProtos.DiskBalancerRunningStatus filterStatus = status.orElse(null); + + if (filterDns != null) { + return filterDns.stream() + .filter(dn -> shouldReturnDatanode(filterStatus, dn)) + .map(nodeManager::getDatanodeInfo) + .map(dn -> getInfoProto(dn, clientVersion)) + .collect(Collectors.toList()); + } else { + return nodeManager.getAllNodes().stream() + .filter(dn -> shouldReturnDatanode(filterStatus, dn)) + .map(dn -> getInfoProto((DatanodeInfo)dn, clientVersion)) + .collect(Collectors.toList()); } - return statusList; } - private boolean shouldReturnDatanode(List<DatanodeDetails> hosts, + private boolean shouldReturnDatanode( + HddsProtos.DiskBalancerRunningStatus status, DatanodeDetails datanodeDetails) { - if (hosts == null || hosts.isEmpty()) { - return isRunning(datanodeDetails); - } else { - return hosts.contains(datanodeDetails); + boolean shouldReturn = true; + // If status specified, do not return if status not match. + if (status != null && getRunningStatus(datanodeDetails) != status) { + shouldReturn = false; } + return shouldReturn; } + private HddsProtos.DatanodeDiskBalancerInfoProto getInfoProto( + DatanodeInfo dn, int clientVersion) { + double volumeDensitySum = + getVolumeDataDensitySumForDatanodeDetails(dn); + HddsProtos.DiskBalancerRunningStatus runningStatus = + getRunningStatus(dn); + HddsProtos.DatanodeDiskBalancerInfoProto.Builder builder = + HddsProtos.DatanodeDiskBalancerInfoProto.newBuilder() + .setNode(dn.toProto(clientVersion)) + .setCurrentVolumeDensitySum(volumeDensitySum) + .setRunningStatus(getRunningStatus(dn)); + if (runningStatus != HddsProtos.DiskBalancerRunningStatus.UNKNOWN) { + builder.setDiskBalancerConf(statusMap.get(dn) + .getDiskBalancerConfiguration().toProtobufBuilder()); + } + return builder.build(); + } /** * Get volume density for a specific DatanodeDetails node. * @@ -144,8 +167,7 @@ public class DiskBalancerManager { DatanodeInfo datanodeInfo = (DatanodeInfo) datanodeDetails; double totalCapacity = 0d, totalUsed = 0d; - for (StorageContainerDatanodeProtocolProtos.StorageReportProto reportProto : - datanodeInfo.getStorageReports()) { + for (StorageReportProto reportProto : datanodeInfo.getStorageReports()) { totalCapacity += reportProto.getCapacity(); totalUsed += reportProto.getScmUsed(); } @@ -162,10 +184,17 @@ public class DiskBalancerManager { return volumeDensitySum; } - private boolean isRunning(DatanodeDetails datanodeDetails) { - return statusMap - .getOrDefault(datanodeDetails, DiskBalancerStatus.DUMMY_STATUS) - .isRunning(); + private HddsProtos.DiskBalancerRunningStatus getRunningStatus( + DatanodeDetails datanodeDetails) { + if (!statusMap.containsKey(datanodeDetails)) { + return HddsProtos.DiskBalancerRunningStatus.UNKNOWN; + } else { + if (statusMap.get(datanodeDetails).isRunning()) { + return HddsProtos.DiskBalancerRunningStatus.RUNNING; + } else { + return HddsProtos.DiskBalancerRunningStatus.STOPPED; + } + } } @VisibleForTesting @@ -173,4 +202,24 @@ public class DiskBalancerManager { statusMap.put(datanodeDetails, new DiskBalancerStatus(true, new DiskBalancerConfiguration())); } + + public void processDiskBalancerReport(DiskBalancerReportProto reportProto, + DatanodeDetails dn) { + boolean isRunning = reportProto.getIsRunning(); + DiskBalancerConfiguration diskBalancerConfiguration = + reportProto.hasDiskBalancerConf() ? + DiskBalancerConfiguration.fromProtobuf( + reportProto.getDiskBalancerConf(), conf) : + new DiskBalancerConfiguration(); + statusMap.put(dn, new DiskBalancerStatus(isRunning, + diskBalancerConfiguration)); + if (reportProto.hasBalancedBytes()) { + balancedBytesMap.put(dn, reportProto.getBalancedBytes()); + } + } + + @VisibleForTesting + public Map<DatanodeDetails, DiskBalancerStatus> getStatusMap() { + return statusMap; + } } diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/DiskBalancerReportHandler.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/DiskBalancerReportHandler.java new file mode 100644 index 0000000000..47158972ec --- /dev/null +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/DiskBalancerReportHandler.java @@ -0,0 +1,65 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hdds.scm.node; + +import com.google.common.base.Preconditions; +import org.apache.hadoop.hdds.protocol.DatanodeDetails; +import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.DiskBalancerReportProto; +import org.apache.hadoop.hdds.scm.server.SCMDatanodeHeartbeatDispatcher.DiskBalancerReportFromDatanode; +import org.apache.hadoop.hdds.server.events.EventHandler; +import org.apache.hadoop.hdds.server.events.EventPublisher; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Handles DiskBalancer Reports from datanode. + */ +public class DiskBalancerReportHandler implements + EventHandler<DiskBalancerReportFromDatanode> { + + private static final Logger LOGGER = LoggerFactory + .getLogger(DiskBalancerReportHandler.class); + + private DiskBalancerManager diskBalancerManager; + + public DiskBalancerReportHandler(DiskBalancerManager diskBalancerManager) { + this.diskBalancerManager = diskBalancerManager; + } + + @Override + public void onMessage(DiskBalancerReportFromDatanode reportFromDatanode, + EventPublisher publisher) { + Preconditions.checkNotNull(reportFromDatanode); + DatanodeDetails dn = reportFromDatanode.getDatanodeDetails(); + DiskBalancerReportProto diskBalancerReportProto = + reportFromDatanode.getReport(); + Preconditions.checkNotNull(dn, + "DiskBalancer Report is missing DatanodeDetails."); + if (LOGGER.isTraceEnabled()) { + LOGGER.trace("Processing diskBalancer report for dn: {}", dn); + } + try { + diskBalancerManager.processDiskBalancerReport( + diskBalancerReportProto, dn); + } catch (Exception e) { + LOGGER.error("Failed to process diskBalancer report={} from dn={}.", + diskBalancerReportProto, dn, e); + } + } +} diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/DiskBalancerStatus.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/DiskBalancerStatus.java index a064a6ff9f..8d2cb26c36 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/DiskBalancerStatus.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/DiskBalancerStatus.java @@ -32,8 +32,6 @@ public class DiskBalancerStatus { private boolean isRunning; private DiskBalancerConfiguration diskBalancerConfiguration; - public static final DiskBalancerStatus DUMMY_STATUS = - new DiskBalancerStatus(false, new DiskBalancerConfiguration()); public DiskBalancerStatus(boolean isRunning, DiskBalancerConfiguration conf) { this.isRunning = isRunning; diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/NodeManager.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/NodeManager.java index 21bcd1f78a..6063e6dd88 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/NodeManager.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/NodeManager.java @@ -175,6 +175,14 @@ public interface NodeManager extends StorageContainerNodeProtocol, */ DatanodeUsageInfo getUsageInfo(DatanodeDetails dn); + /** + * Get the datanode info of a specified datanode. + * + * @param dn the usage of which we want to get + * @return DatanodeInfo of the specified datanode + */ + DatanodeInfo getDatanodeInfo(DatanodeDetails dn); + /** * Return the node stat of the specified datanode. * @param datanodeDetails DatanodeDetails. diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/SCMNodeManager.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/SCMNodeManager.java index a149998db8..e3a7046afa 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/SCMNodeManager.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/SCMNodeManager.java @@ -983,6 +983,23 @@ public class SCMNodeManager implements NodeManager { return usageInfo; } + /** + * Get the usage info of a specified datanode. + * + * @param dn the usage of which we want to get + * @return DatanodeUsageInfo of the specified datanode + */ + @Override + public DatanodeInfo getDatanodeInfo(DatanodeDetails dn) { + try { + return nodeStateManager.getNode(dn); + } catch (NodeNotFoundException e) { + LOG.warn("Cannot retrieve DatanodeInfo, datanode {} not found.", + dn.getUuid()); + return null; + } + } + /** * Return the node stat of the specified datanode. * diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/protocol/StorageContainerLocationProtocolServerSideTranslatorPB.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/protocol/StorageContainerLocationProtocolServerSideTranslatorPB.java index 71a8edad41..60b17d1fcb 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/protocol/StorageContainerLocationProtocolServerSideTranslatorPB.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/protocol/StorageContainerLocationProtocolServerSideTranslatorPB.java @@ -41,6 +41,7 @@ import org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolPro import org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.DatanodeAdminErrorResponseProto; import org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.DatanodeUsageInfoResponseProto; import org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.DatanodeDiskBalancerInfoResponseProto; +import org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.DatanodeDiskBalancerOpResponseProto; import org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.DeactivatePipelineRequestProto; import org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.DeactivatePipelineResponseProto; import org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.DecommissionNodesRequestProto; @@ -731,6 +732,13 @@ public final class StorageContainerLocationProtocolServerSideTranslatorPB request.getDatanodeDiskBalancerInfoRequest(), request.getVersion())) .build(); + case DatanodeDiskBalancerOp: + return ScmContainerLocationResponse.newBuilder() + .setCmdType(request.getCmdType()) + .setStatus(Status.OK) + .setDatanodeDiskBalancerOpResponse(getDatanodeDiskBalancerOp( + request.getDatanodeDiskBalancerOpRequest())) + .build(); default: throw new IllegalArgumentException( "Unknown command type: " + request.getCmdType()); @@ -1317,7 +1325,9 @@ public final class StorageContainerLocationProtocolServerSideTranslatorPB break; case status: infoProtoList = impl.getDiskBalancerStatus( - Optional.of(request.getHostsList()), clientVersion); + Optional.of(request.getHostsList()), + Optional.of(request.getStatus()), + clientVersion); break; default: infoProtoList = null; @@ -1330,4 +1340,43 @@ public final class StorageContainerLocationProtocolServerSideTranslatorPB public GetMetricsResponseProto getMetrics(GetMetricsRequestProto request) throws IOException { return GetMetricsResponseProto.newBuilder().setMetricsJson(impl.getMetrics(request.getQuery())).build(); } + + public DatanodeDiskBalancerOpResponseProto getDatanodeDiskBalancerOp( + StorageContainerLocationProtocolProtos. + DatanodeDiskBalancerOpRequestProto request) + throws IOException { + List<DatanodeAdminError> errors; + switch (request.getOpType()) { + case start: + errors = impl.startDiskBalancer( + Optional.of(request.getConf().getThreshold()), + Optional.of(request.getConf().getDiskBandwidthInMB()), + Optional.of(request.getConf().getParallelThread()), + Optional.of(request.getHostsList())); + break; + case update: + errors = impl.updateDiskBalancerConfiguration( + Optional.of(request.getConf().getThreshold()), + Optional.of(request.getConf().getDiskBandwidthInMB()), + Optional.of(request.getConf().getParallelThread()), + Optional.of(request.getHostsList())); + break; + case stop: + errors = impl.stopDiskBalancer(Optional.of(request.getHostsList())); + break; + default: + errors = new ArrayList<>(); + } + + DatanodeDiskBalancerOpResponseProto.Builder response = + DatanodeDiskBalancerOpResponseProto.newBuilder(); + for (DatanodeAdminError e : errors) { + DatanodeAdminErrorResponseProto.Builder error = + DatanodeAdminErrorResponseProto.newBuilder(); + error.setHost(e.getHostname()); + error.setError(e.getError()); + response.addFailedHosts(error); + } + return response.build(); + } } diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMClientProtocolServer.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMClientProtocolServer.java index 6da9907db3..91883985d7 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMClientProtocolServer.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMClientProtocolServer.java @@ -1290,7 +1290,9 @@ public class SCMClientProtocolServer implements @Override public List<HddsProtos.DatanodeDiskBalancerInfoProto> getDiskBalancerStatus( - Optional<List<String>> hosts, int clientVersion) throws IOException { + Optional<List<String>> hosts, + Optional<HddsProtos.DiskBalancerRunningStatus> status, + int clientVersion) throws IOException { // check admin authorisation try { getScm().checkAdminAccess(getRemoteUser(), true); @@ -1299,29 +1301,33 @@ public class SCMClientProtocolServer implements throw e; } - return scm.getDiskBalancerManager().getDiskBalancerStatus(hosts, + return scm.getDiskBalancerManager().getDiskBalancerStatus(hosts, status, clientVersion); } @Override - public void startDiskBalancer(Optional<Double> threshold, - Optional<Double> bandwidth, Optional<List<String>> hosts) - throws IOException { + public List<DatanodeAdminError> startDiskBalancer(Optional<Double> threshold, + Optional<Long> bandwidthInMB, Optional<Integer> parallelThread, + Optional<List<String>> hosts) throws IOException { // TODO: Send message to datanodes + return null; } @Override - public void stopDiskBalancer(Optional<List<String>> hosts) + public List<DatanodeAdminError> stopDiskBalancer(Optional<List<String>> hosts) throws IOException { // TODO: Send message to datanodes + return null; } @Override - public void updateDiskBalancerConfiguration(Optional<Double> threshold, - Optional<Double> bandwidth, Optional<List<String>> hosts) + public List<DatanodeAdminError> updateDiskBalancerConfiguration( + Optional<Double> threshold, Optional<Long> bandwidthInMB, + Optional<Integer> parallelThread, Optional<List<String>> hosts) throws IOException { // TODO: Send message to datanodes + return null; } /** diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMDatanodeHeartbeatDispatcher.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMDatanodeHeartbeatDispatcher.java index 484a1e6f0f..59ff40ebf9 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMDatanodeHeartbeatDispatcher.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMDatanodeHeartbeatDispatcher.java @@ -22,6 +22,7 @@ import org.apache.hadoop.hdds.protocol.DatanodeDetails; import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.CommandQueueReportProto; import org.apache.hadoop.hdds.protocol.proto .StorageContainerDatanodeProtocolProtos.CRLStatusReport; +import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.DiskBalancerReportProto; import org.apache.hadoop.hdds.protocol.proto .StorageContainerDatanodeProtocolProtos.IncrementalContainerReportProto; import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.LayoutVersionProto; @@ -57,6 +58,7 @@ import java.util.UUID; import static org.apache.hadoop.hdds.scm.events.SCMEvents.CONTAINER_ACTIONS; import static org.apache.hadoop.hdds.scm.events.SCMEvents.CONTAINER_REPORT; +import static org.apache.hadoop.hdds.scm.events.SCMEvents.DISK_BALANCER_REPORT; import static org.apache.hadoop.hdds.scm.events.SCMEvents .INCREMENTAL_CONTAINER_REPORT; import static org.apache.hadoop.hdds.scm.events.SCMEvents.NODE_REPORT; @@ -201,6 +203,15 @@ public final class SCMDatanodeHeartbeatDispatcher { commandStatusReport)); } } + + if (heartbeat.hasDiskBalancerReport()) { + LOG.debug("Dispatching DiskBalancer Report."); + eventPublisher.fireEvent( + DISK_BALANCER_REPORT, + new DiskBalancerReportFromDatanode( + datanodeDetails, + heartbeat.getDiskBalancerReport())); + } } return commands; @@ -453,4 +464,16 @@ public final class SCMDatanodeHeartbeatDispatcher { super(datanodeDetails, report); } } + + /** + * DiskBalancer report event payload with origin. + */ + public static class DiskBalancerReportFromDatanode + extends ReportFromDatanode<DiskBalancerReportProto> { + + public DiskBalancerReportFromDatanode(DatanodeDetails datanodeDetails, + DiskBalancerReportProto report) { + super(datanodeDetails, report); + } + } } diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/StorageContainerManager.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/StorageContainerManager.java index 7b14cf2e1b..c1362dd298 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/StorageContainerManager.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/StorageContainerManager.java @@ -71,6 +71,7 @@ import org.apache.hadoop.hdds.scm.ha.SCMHAUtils; import org.apache.hadoop.hdds.scm.ha.SequenceIdGenerator; import org.apache.hadoop.hdds.scm.ScmInfo; import org.apache.hadoop.hdds.scm.node.DiskBalancerManager; +import org.apache.hadoop.hdds.scm.node.DiskBalancerReportHandler; import org.apache.hadoop.hdds.scm.node.NodeAddressUpdateHandler; import org.apache.hadoop.hdds.scm.security.SecretKeyManagerService; import org.apache.hadoop.hdds.scm.security.RootCARotationManager; @@ -507,6 +508,8 @@ public final class StorageContainerManager extends ServiceRuntimeInfoImpl new PipelineActionHandler(pipelineManager, scmContext, configuration); CRLStatusReportHandler crlStatusReportHandler = new CRLStatusReportHandler(certificateStore, configuration); + DiskBalancerReportHandler diskBalancerReportHandler = + new DiskBalancerReportHandler(diskBalancerManager); eventQueue.addHandler(SCMEvents.DATANODE_COMMAND, scmNodeManager); eventQueue.addHandler(SCMEvents.RETRIABLE_DATANODE_COMMAND, scmNodeManager); @@ -584,6 +587,8 @@ public final class StorageContainerManager extends ServiceRuntimeInfoImpl scmNodeManager.registerSendCommandNotify( SCMCommandProto.Type.deleteBlocksCommand, scmBlockManager.getDeletedBlockLog()::onSent); + eventQueue.addHandler(SCMEvents.DISK_BALANCER_REPORT, + diskBalancerReportHandler); } private void initializeCertificateClient() throws IOException { diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/MockNodeManager.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/MockNodeManager.java index 21c3f1c9a8..aeda597ab6 100644 --- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/MockNodeManager.java +++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/MockNodeManager.java @@ -349,7 +349,36 @@ public class MockNodeManager implements NodeManager { */ @Override public List<DatanodeDetails> getAllNodes() { - return new ArrayList<>(nodeMetricMap.keySet()); + // mock storage reports for TestDiskBalancer + List<DatanodeDetails> healthyNodesWithInfo = new ArrayList<>(); + for (Map.Entry<DatanodeDetails, SCMNodeStat> entry: + nodeMetricMap.entrySet()) { + NodeStatus nodeStatus = NodeStatus.inServiceHealthy(); + if (staleNodes.contains(entry.getKey())) { + nodeStatus = NodeStatus.inServiceStale(); + } else if (deadNodes.contains(entry.getKey())) { + nodeStatus = NodeStatus.inServiceDead(); + } + DatanodeInfo di = new DatanodeInfo(entry.getKey(), nodeStatus, + UpgradeUtils.defaultLayoutVersionProto()); + + long capacity = entry.getValue().getCapacity().get(); + long used = entry.getValue().getScmUsed().get(); + long remaining = entry.getValue().getRemaining().get(); + StorageReportProto storage1 = HddsTestUtils.createStorageReport( + di.getUuid(), "/data1-" + di.getUuidString(), + capacity, used, remaining, null); + MetadataStorageReportProto metaStorage1 = + HddsTestUtils.createMetadataStorageReport( + "/metadata1-" + di.getUuidString(), capacity, used, + remaining, null); + di.updateStorageReports(new ArrayList<>(Arrays.asList(storage1))); + di.updateMetaDataStorageReports( + new ArrayList<>(Arrays.asList(metaStorage1))); + + healthyNodesWithInfo.add(di); + } + return healthyNodesWithInfo; } /** @@ -412,6 +441,26 @@ public class MockNodeManager implements NodeManager { return new DatanodeUsageInfo(datanodeDetails, stat); } + @Override + public DatanodeInfo getDatanodeInfo(DatanodeDetails dd) { + DatanodeInfo di = new DatanodeInfo(dd, NodeStatus.inServiceHealthy(), + UpgradeUtils.defaultLayoutVersionProto()); + long capacity = nodeMetricMap.get(dd).getCapacity().get(); + long used = nodeMetricMap.get(dd).getScmUsed().get(); + long remaining = nodeMetricMap.get(dd).getRemaining().get(); + StorageReportProto storage1 = HddsTestUtils.createStorageReport( + di.getUuid(), "/data1-" + di.getUuidString(), + capacity, used, remaining, null); + MetadataStorageReportProto metaStorage1 = + HddsTestUtils.createMetadataStorageReport( + "/metadata1-" + di.getUuidString(), capacity, used, + remaining, null); + di.updateStorageReports(new ArrayList<>(Arrays.asList(storage1))); + di.updateMetaDataStorageReports( + new ArrayList<>(Arrays.asList(metaStorage1))); + return di; + } + /** * Return the node stat of the specified datanode. * @param datanodeDetails - datanode details. diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/SimpleMockNodeManager.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/SimpleMockNodeManager.java index 9649159de3..0838e56ba8 100644 --- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/SimpleMockNodeManager.java +++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/SimpleMockNodeManager.java @@ -245,6 +245,11 @@ public class SimpleMockNodeManager implements NodeManager { return null; } + @Override + public DatanodeInfo getDatanodeInfo(DatanodeDetails dn) { + return null; + } + @Override public SCMNodeMetric getNodeStat(DatanodeDetails datanodeDetails) { return null; diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestDiskBalancerManager.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestDiskBalancerManager.java index 541c9764b7..7005dea292 100644 --- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestDiskBalancerManager.java +++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestDiskBalancerManager.java @@ -21,8 +21,10 @@ import org.apache.hadoop.hdds.HddsConfigKeys; import org.apache.hadoop.hdds.conf.OzoneConfiguration; import org.apache.hadoop.hdds.protocol.DatanodeDetails; import org.apache.hadoop.hdds.protocol.proto.HddsProtos; +import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.DiskBalancerReportProto; import org.apache.hadoop.hdds.scm.container.MockNodeManager; import org.apache.hadoop.hdds.scm.ha.SCMContext; +import org.apache.hadoop.hdds.scm.server.SCMDatanodeHeartbeatDispatcher.DiskBalancerReportFromDatanode; import org.apache.hadoop.hdds.server.events.EventQueue; import org.apache.hadoop.ozone.ClientVersion; import org.apache.ozone.test.GenericTestUtils; @@ -33,6 +35,7 @@ import org.junit.jupiter.api.Test; import java.io.IOException; import java.util.List; import java.util.Optional; +import java.util.Random; import java.util.UUID; import java.util.stream.Collectors; @@ -46,6 +49,8 @@ public class TestDiskBalancerManager { private NodeManager nodeManager; private OzoneConfiguration conf; private String storageDir; + private DiskBalancerReportHandler diskBalancerReportHandler; + private Random random; @BeforeEach public void setup() throws Exception { @@ -56,6 +61,9 @@ public class TestDiskBalancerManager { nodeManager = new MockNodeManager(true, 3); diskBalancerManager = new DiskBalancerManager(conf, new EventQueue(), SCMContext.emptyContext(), nodeManager); + diskBalancerReportHandler = + new DiskBalancerReportHandler(diskBalancerManager); + random = new Random(); } @Test @@ -73,6 +81,7 @@ public class TestDiskBalancerManager { @Test public void testDatanodeDiskBalancerStatus() throws IOException { diskBalancerManager.addRunningDatanode(nodeManager.getAllNodes().get(0)); + diskBalancerManager.addRunningDatanode(nodeManager.getAllNodes().get(1)); // Simulate users asking all status of 3 datanodes List<String> dns = nodeManager.getAllNodes().stream().map( @@ -81,6 +90,7 @@ public class TestDiskBalancerManager { List<HddsProtos.DatanodeDiskBalancerInfoProto> statusProtoList = diskBalancerManager.getDiskBalancerStatus(Optional.of(dns), + Optional.empty(), ClientVersion.CURRENT_VERSION); Assertions.assertEquals(3, statusProtoList.size()); @@ -92,8 +102,32 @@ public class TestDiskBalancerManager { statusProtoList = diskBalancerManager.getDiskBalancerStatus(Optional.of(dns), + Optional.empty(), ClientVersion.CURRENT_VERSION); Assertions.assertEquals(1, statusProtoList.size()); } + + @Test + public void testHandleDiskBalancerReportFromDatanode() { + for (DatanodeDetails dn: nodeManager.getAllNodes()) { + diskBalancerReportHandler.onMessage( + new DiskBalancerReportFromDatanode(dn, generateRandomReport()), null); + } + + Assertions.assertEquals(3, diskBalancerManager.getStatusMap().size()); + } + + private DiskBalancerReportProto generateRandomReport() { + return DiskBalancerReportProto.newBuilder() + .setIsRunning(random.nextBoolean()) + .setBalancedBytes(random.nextInt(10000)) + .setDiskBalancerConf( + HddsProtos.DiskBalancerConfigurationProto.newBuilder() + .setThreshold(random.nextInt(99)) + .setParallelThread(random.nextInt(4) + 1) + .setDiskBandwidthInMB(random.nextInt(99) + 1) + .build()) + .build(); + } } diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/ozone/container/testutils/ReplicationNodeManagerMock.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/ozone/container/testutils/ReplicationNodeManagerMock.java index 92a6fd455d..4b83abff83 100644 --- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/ozone/container/testutils/ReplicationNodeManagerMock.java +++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/ozone/container/testutils/ReplicationNodeManagerMock.java @@ -24,6 +24,7 @@ import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolPro import org.apache.hadoop.hdds.protocol.proto.HddsProtos; import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.PipelineReportsProto; import org.apache.hadoop.hdds.scm.container.ContainerID; +import org.apache.hadoop.hdds.scm.node.DatanodeInfo; import org.apache.hadoop.hdds.scm.node.DatanodeUsageInfo; import org.apache.hadoop.hdds.scm.node.NodeStatus; import org.apache.hadoop.hdds.scm.net.NetworkTopology; @@ -195,6 +196,11 @@ public class ReplicationNodeManagerMock implements NodeManager { return null; } + @Override + public DatanodeInfo getDatanodeInfo(DatanodeDetails dn) { + return null; + } + /** * Return the node stat of the specified datanode. * diff --git a/hadoop-hdds/tools/src/main/java/org/apache/hadoop/hdds/scm/cli/ContainerOperationClient.java b/hadoop-hdds/tools/src/main/java/org/apache/hadoop/hdds/scm/cli/ContainerOperationClient.java index cb3fdb42f8..641060a1e5 100644 --- a/hadoop-hdds/tools/src/main/java/org/apache/hadoop/hdds/scm/cli/ContainerOperationClient.java +++ b/hadoop-hdds/tools/src/main/java/org/apache/hadoop/hdds/scm/cli/ContainerOperationClient.java @@ -575,31 +575,34 @@ public class ContainerOperationClient implements ScmClient { } @Override - public void startDiskBalancer(Optional<Double> threshold, - Optional<Double> bandwidth, Optional<List<String>> hosts) - throws IOException { - storageContainerLocationClient.startDiskBalancer(threshold, bandwidth, - hosts); + public List<DatanodeAdminError> startDiskBalancer(Optional<Double> threshold, + Optional<Long> bandwidthInMB, Optional<Integer> parallelThread, + Optional<List<String>> hosts) throws IOException { + return storageContainerLocationClient.startDiskBalancer(threshold, + bandwidthInMB, parallelThread, hosts); } @Override - public void stopDiskBalancer(Optional<List<String>> hosts) + public List<DatanodeAdminError> stopDiskBalancer(Optional<List<String>> hosts) throws IOException { - storageContainerLocationClient.stopDiskBalancer(hosts); + return storageContainerLocationClient.stopDiskBalancer(hosts); } @Override public List<HddsProtos.DatanodeDiskBalancerInfoProto> getDiskBalancerStatus( - Optional<List<String>> hosts) throws IOException { + Optional<List<String>> hosts, + Optional<HddsProtos.DiskBalancerRunningStatus> runningStatus) + throws IOException { return storageContainerLocationClient.getDiskBalancerStatus(hosts, - ClientVersion.CURRENT_VERSION); + runningStatus, ClientVersion.CURRENT_VERSION); } @Override - public void updateDiskBalancerConfiguration(Optional<Double> threshold, - Optional<Double> bandwidth, Optional<List<String>> hosts) + public List<DatanodeAdminError> updateDiskBalancerConfiguration( + Optional<Double> threshold, Optional<Long> bandwidth, + Optional<Integer> parallelThread, Optional<List<String>> hosts) throws IOException { - storageContainerLocationClient.updateDiskBalancerConfiguration(threshold, - bandwidth, hosts); + return storageContainerLocationClient.updateDiskBalancerConfiguration( + threshold, bandwidth, parallelThread, hosts); } } diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/scm/node/TestDiskBalancer.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/scm/node/TestDiskBalancer.java index c047cd3ee0..7eaca03fed 100644 --- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/scm/node/TestDiskBalancer.java +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/scm/node/TestDiskBalancer.java @@ -27,6 +27,7 @@ import org.apache.hadoop.hdds.scm.cli.ContainerOperationClient; import org.apache.hadoop.hdds.scm.client.ScmClient; import org.apache.hadoop.hdds.scm.container.placement.algorithms.SCMContainerPlacementCapacity; import org.apache.hadoop.hdds.scm.node.DatanodeInfo; +import org.apache.hadoop.hdds.scm.node.DiskBalancerManager; import org.apache.hadoop.ozone.MiniOzoneCluster; import org.junit.jupiter.api.AfterAll; import org.junit.jupiter.api.BeforeAll; @@ -48,6 +49,7 @@ public class TestDiskBalancer { private static ScmClient storageClient; private static MiniOzoneCluster cluster; private static OzoneConfiguration ozoneConf; + private static DiskBalancerManager diskBalancerManager; @BeforeAll public static void setup() throws Exception { @@ -57,6 +59,8 @@ public class TestDiskBalancer { cluster = MiniOzoneCluster.newBuilder(ozoneConf).setNumDatanodes(3).build(); storageClient = new ContainerOperationClient(ozoneConf); cluster.waitForClusterToBeReady(); + diskBalancerManager = cluster.getStorageContainerManager() + .getDiskBalancerManager(); for (DatanodeDetails dn: cluster.getStorageContainerManager() .getScmNodeManager().getAllNodes()) { --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
