This is an automated email from the ASF dual-hosted git repository. sodonnell pushed a commit to branch HDDS-1880-Decom in repository https://gitbox.apache.org/repos/asf/hadoop-ozone.git
The following commit(s) were added to refs/heads/HDDS-1880-Decom by this push: new d902586 HDDS-2592. Add Datanode command to allow the datanode to persist its admin state (#521) d902586 is described below commit d902586b7ca39083ba66ea19778b537212bc42d3 Author: Stephen O'Donnell <stephen.odonn...@gmail.com> AuthorDate: Tue Mar 31 22:58:40 2020 +0100 HDDS-2592. Add Datanode command to allow the datanode to persist its admin state (#521) --- .../hadoop/hdds/protocol/DatanodeDetails.java | 98 ++++++++++++- hadoop-hdds/common/src/main/proto/hdds.proto | 2 + .../container/common/helpers/DatanodeIdYaml.java | 35 +++++ .../common/statemachine/DatanodeStateMachine.java | 2 + .../SetNodeOperationalStateCommandHandler.java | 157 +++++++++++++++++++++ .../states/endpoint/HeartbeatEndpointTask.java | 12 ++ .../commands/SetNodeOperationalStateCommand.java | 89 ++++++++++++ .../proto/StorageContainerDatanodeProtocol.proto | 8 ++ .../hdds/scm/node/DatanodeAdminMonitorImpl.java | 3 +- .../hdds/scm/node/DatanodeAdminNodeDetails.java | 8 ++ .../hdds/scm/node/NodeDecommissionManager.java | 8 +- .../apache/hadoop/hdds/scm/node/NodeManager.java | 12 ++ .../hadoop/hdds/scm/node/NodeStateManager.java | 26 +++- .../apache/hadoop/hdds/scm/node/NodeStatus.java | 32 ++++- .../hadoop/hdds/scm/node/SCMNodeManager.java | 81 ++++++++++- .../hadoop/hdds/scm/node/states/NodeStateMap.java | 5 +- .../hdds/scm/server/SCMDatanodeProtocolServer.java | 8 ++ .../hadoop/hdds/scm/container/MockNodeManager.java | 10 ++ .../hdds/scm/container/SimpleMockNodeManager.java | 10 +- .../hdds/scm/node/TestNodeDecommissionManager.java | 5 + .../hadoop/hdds/scm/node/TestSCMNodeManager.java | 30 ++++ .../hdds/scm/node/states/TestNodeStateMap.java | 5 +- .../testutils/ReplicationNodeManagerMock.java | 15 +- 23 files changed, 643 insertions(+), 18 deletions(-) diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/protocol/DatanodeDetails.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/protocol/DatanodeDetails.java index 698a443..a2584d8 100644 --- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/protocol/DatanodeDetails.java +++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/protocol/DatanodeDetails.java @@ -49,6 +49,8 @@ public class DatanodeDetails extends NodeImpl implements private String hostName; private List<Port> ports; private String certSerialId; + private HddsProtos.NodeOperationalState persistedOpState; + private long persistedOpStateExpiryEpochSec = 0; /** * Constructs DatanodeDetails instance. DatanodeDetails.Builder is used @@ -59,15 +61,23 @@ public class DatanodeDetails extends NodeImpl implements * @param networkLocation DataNode's network location path * @param ports Ports used by the DataNode * @param certSerialId serial id from SCM issued certificate. + * @param persistedOpState Operational State stored on DN. + * @param persistedOpStateExpiryEpochSec Seconds after the epoch the stored + * state should expire. */ + @SuppressWarnings("checkstyle:ParameterNumber") private DatanodeDetails(String uuid, String ipAddress, String hostName, - String networkLocation, List<Port> ports, String certSerialId) { + String networkLocation, List<Port> ports, String certSerialId, + HddsProtos.NodeOperationalState persistedOpState, + long persistedOpStateExpiryEpochSec) { super(hostName, networkLocation, NetConstants.NODE_COST_DEFAULT); this.uuid = UUID.fromString(uuid); this.ipAddress = ipAddress; this.hostName = hostName; this.ports = ports; this.certSerialId = certSerialId; + this.persistedOpState = persistedOpState; + this.persistedOpStateExpiryEpochSec = persistedOpStateExpiryEpochSec; } protected DatanodeDetails(DatanodeDetails datanodeDetails) { @@ -78,6 +88,9 @@ public class DatanodeDetails extends NodeImpl implements this.hostName = datanodeDetails.hostName; this.ports = datanodeDetails.ports; this.setNetworkName(datanodeDetails.getNetworkName()); + this.persistedOpState = datanodeDetails.getPersistedOpState(); + this.persistedOpStateExpiryEpochSec = + datanodeDetails.getPersistedOpStateExpiryEpochSec(); } /** @@ -156,6 +169,46 @@ public class DatanodeDetails extends NodeImpl implements } /** + * Return the persistedOpState. If the stored value is null, return the + * default value of IN_SERVICE. + * + * @return The OperationalState persisted on the datanode. + */ + public HddsProtos.NodeOperationalState getPersistedOpState() { + if (persistedOpState == null) { + return HddsProtos.NodeOperationalState.IN_SERVICE; + } else { + return persistedOpState; + } + } + + /** + * Set the persistedOpState for this instance. + * + * @param state The new operational state. + */ + public void setPersistedOpState(HddsProtos.NodeOperationalState state) { + this.persistedOpState = state; + } + + /** + * Get the persistedOpStateExpiryEpochSec for the instance. + * @return Seconds from the epoch when the operational state should expire. + */ + public long getPersistedOpStateExpiryEpochSec() { + return persistedOpStateExpiryEpochSec; + } + + /** + * Set persistedOpStateExpiryEpochSec. + * @param expiry The number of second after the epoch the operational state + * should expire. + */ + public void setPersistedOpStateExpiryEpochSec(long expiry) { + this.persistedOpStateExpiryEpochSec = expiry; + } + + /** * Given the name returns port number, null if the asked port is not found. * * @param name Name of the port @@ -200,6 +253,13 @@ public class DatanodeDetails extends NodeImpl implements if (datanodeDetailsProto.hasNetworkLocation()) { builder.setNetworkLocation(datanodeDetailsProto.getNetworkLocation()); } + if (datanodeDetailsProto.hasPersistedOpState()) { + builder.setPersistedOpState(datanodeDetailsProto.getPersistedOpState()); + } + if (datanodeDetailsProto.hasPersistedOpStateExpiry()) { + builder.setPersistedOpStateExpiry( + datanodeDetailsProto.getPersistedOpStateExpiry()); + } return builder.build(); } @@ -226,6 +286,10 @@ public class DatanodeDetails extends NodeImpl implements if (!Strings.isNullOrEmpty(getNetworkLocation())) { builder.setNetworkLocation(getNetworkLocation()); } + if (persistedOpState != null) { + builder.setPersistedOpState(persistedOpState); + } + builder.setPersistedOpStateExpiry(persistedOpStateExpiryEpochSec); for (Port port : ports) { builder.addPorts(HddsProtos.Port.newBuilder() @@ -246,6 +310,8 @@ public class DatanodeDetails extends NodeImpl implements ", networkLocation: " + getNetworkLocation() + ", certSerialId: " + certSerialId + + ", persistedOpState: " + persistedOpState + + ", persistedOpStateExpiryEpochSec: " + persistedOpStateExpiryEpochSec + "}"; } @@ -285,6 +351,8 @@ public class DatanodeDetails extends NodeImpl implements private String networkLocation; private List<Port> ports; private String certSerialId; + private HddsProtos.NodeOperationalState persistedOpState; + private long persistedOpStateExpiryEpochSec = 0; /** * Default private constructor. To create Builder instance use @@ -374,6 +442,31 @@ public class DatanodeDetails extends NodeImpl implements } /** + * Adds persistedOpState. + * + * @param state The operational state persisted on the datanode + * + * @return DatanodeDetails.Builder + */ + public Builder setPersistedOpState(HddsProtos.NodeOperationalState state) { + this.persistedOpState = state; + return this; + } + + /** + * Adds persistedOpStateExpiryEpochSec. + * + * @param expiry The seconds after the epoch the operational state should + * expire. + * + * @return DatanodeDetails.Builder + */ + public Builder setPersistedOpStateExpiry(long expiry) { + this.persistedOpStateExpiryEpochSec = expiry; + return this; + } + + /** * Builds and returns DatanodeDetails instance. * * @return DatanodeDetails @@ -384,7 +477,8 @@ public class DatanodeDetails extends NodeImpl implements networkLocation = NetConstants.DEFAULT_RACK; } DatanodeDetails dn = new DatanodeDetails(id, ipAddress, hostName, - networkLocation, ports, certSerialId); + networkLocation, ports, certSerialId, persistedOpState, + persistedOpStateExpiryEpochSec); if (networkName != null) { dn.setNetworkName(networkName); } diff --git a/hadoop-hdds/common/src/main/proto/hdds.proto b/hadoop-hdds/common/src/main/proto/hdds.proto index ad45d30..f629560 100644 --- a/hadoop-hdds/common/src/main/proto/hdds.proto +++ b/hadoop-hdds/common/src/main/proto/hdds.proto @@ -37,6 +37,8 @@ message DatanodeDetailsProto { // network name, can be Ip address or host name, depends optional string networkName = 6; optional string networkLocation = 7; // Network topology location + optional NodeOperationalState persistedOpState = 8; // The Operational state persisted in the datanode.id file + optional int64 persistedOpStateExpiry = 9; // The seconds after the epoch when the OpState should expire } /** diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/helpers/DatanodeIdYaml.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/helpers/DatanodeIdYaml.java index d3efa98..d016569 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/helpers/DatanodeIdYaml.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/helpers/DatanodeIdYaml.java @@ -29,6 +29,7 @@ import java.util.Map; import org.apache.commons.collections.CollectionUtils; import org.apache.commons.collections.MapUtils; import org.apache.hadoop.hdds.protocol.DatanodeDetails; +import org.apache.hadoop.hdds.protocol.proto.HddsProtos; import org.yaml.snakeyaml.DumperOptions; import org.yaml.snakeyaml.Yaml; @@ -82,6 +83,12 @@ public final class DatanodeIdYaml { .setIpAddress(datanodeDetailsYaml.getIpAddress()) .setHostName(datanodeDetailsYaml.getHostName()) .setCertSerialId(datanodeDetailsYaml.getCertSerialId()); + if (datanodeDetailsYaml.getPersistedOpState() != null) { + builder.setPersistedOpState(HddsProtos.NodeOperationalState.valueOf( + datanodeDetailsYaml.getPersistedOpState())); + } + builder.setPersistedOpStateExpiry( + datanodeDetailsYaml.getPersistedOpStateExpiryEpochSec()); if (!MapUtils.isEmpty(datanodeDetailsYaml.getPortDetails())) { for (Map.Entry<String, Integer> portEntry : @@ -105,6 +112,8 @@ public final class DatanodeIdYaml { private String ipAddress; private String hostName; private String certSerialId; + private String persistedOpState; + private long persistedOpStateExpiryEpochSec = 0; private Map<String, Integer> portDetails; public DatanodeDetailsYaml() { @@ -113,11 +122,15 @@ public final class DatanodeIdYaml { private DatanodeDetailsYaml(String uuid, String ipAddress, String hostName, String certSerialId, + String persistedOpState, + long persistedOpStateExpiryEpochSec, Map<String, Integer> portDetails) { this.uuid = uuid; this.ipAddress = ipAddress; this.hostName = hostName; this.certSerialId = certSerialId; + this.persistedOpState = persistedOpState; + this.persistedOpStateExpiryEpochSec = persistedOpStateExpiryEpochSec; this.portDetails = portDetails; } @@ -137,6 +150,14 @@ public final class DatanodeIdYaml { return certSerialId; } + public String getPersistedOpState() { + return persistedOpState; + } + + public long getPersistedOpStateExpiryEpochSec() { + return persistedOpStateExpiryEpochSec; + } + public Map<String, Integer> getPortDetails() { return portDetails; } @@ -157,6 +178,14 @@ public final class DatanodeIdYaml { this.certSerialId = certSerialId; } + public void setPersistedOpState(String persistedOpState) { + this.persistedOpState = persistedOpState; + } + + public void setPersistedOpStateExpiryEpochSec(long opStateExpiryEpochSec) { + this.persistedOpStateExpiryEpochSec = opStateExpiryEpochSec; + } + public void setPortDetails(Map<String, Integer> portDetails) { this.portDetails = portDetails; } @@ -172,11 +201,17 @@ public final class DatanodeIdYaml { } } + String persistedOpString = null; + if (datanodeDetails.getPersistedOpState() != null) { + persistedOpString = datanodeDetails.getPersistedOpState().name(); + } return new DatanodeDetailsYaml( datanodeDetails.getUuid().toString(), datanodeDetails.getIpAddress(), datanodeDetails.getHostName(), datanodeDetails.getCertSerialId(), + persistedOpString, + datanodeDetails.getPersistedOpStateExpiryEpochSec(), portDetails); } } diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/DatanodeStateMachine.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/DatanodeStateMachine.java index 5229ae8..6835ef6 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/DatanodeStateMachine.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/DatanodeStateMachine.java @@ -50,6 +50,7 @@ import org.apache.hadoop.ozone.container.common.statemachine.commandhandler .DeleteContainerCommandHandler; import org.apache.hadoop.ozone.container.common.statemachine.commandhandler .ReplicateContainerCommandHandler; +import org.apache.hadoop.ozone.container.common.statemachine.commandhandler.SetNodeOperationalStateCommandHandler; import org.apache.hadoop.ozone.container.keyvalue.TarContainerPacker; import org.apache.hadoop.ozone.container.ozoneimpl.OzoneContainer; import org.apache.hadoop.ozone.container.replication.ContainerReplicator; @@ -138,6 +139,7 @@ public class DatanodeStateMachine implements Closeable { dnConf.getContainerDeleteThreads())) .addHandler(new ClosePipelineCommandHandler()) .addHandler(new CreatePipelineCommandHandler(conf)) + .addHandler(new SetNodeOperationalStateCommandHandler(conf)) .setConnectionManager(connectionManager) .setContainer(container) .setContext(context) diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/SetNodeOperationalStateCommandHandler.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/SetNodeOperationalStateCommandHandler.java new file mode 100644 index 0000000..1fcf5a2 --- /dev/null +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/SetNodeOperationalStateCommandHandler.java @@ -0,0 +1,157 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ +package org.apache.hadoop.ozone.container.common.statemachine.commandhandler; + + +import com.google.common.base.Preconditions; +import org.apache.hadoop.hdds.protocol.DatanodeDetails; +import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos; +import org.apache.hadoop.hdds.scm.HddsServerUtil; +import org.apache.hadoop.hdds.scm.ScmConfigKeys; +import org.apache.hadoop.ozone.container.common.helpers.ContainerUtils; +import org.apache.hadoop.ozone.container.common.statemachine.SCMConnectionManager; +import org.apache.hadoop.ozone.container.common.statemachine.StateContext; +import org.apache.hadoop.ozone.container.ozoneimpl.OzoneContainer; +import org.apache.hadoop.ozone.protocol.commands.SCMCommand; +import org.apache.hadoop.ozone.protocol.commands.SetNodeOperationalStateCommand; +import org.apache.hadoop.util.Time; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.apache.hadoop.conf.Configuration; + +import org.apache.hadoop.hdds.protocol.proto. + StorageContainerDatanodeProtocolProtos.SCMCommandProto.Type; + +import java.io.File; +import java.io.IOException; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicLong; + + +/** + * Handle the SetNodeOperationalStateCommand sent from SCM to the datanode + * to persist the current operational state. + */ +public class SetNodeOperationalStateCommandHandler implements CommandHandler { + + private static final Logger LOG = + LoggerFactory.getLogger(SetNodeOperationalStateCommandHandler.class); + private final Configuration conf; + private final AtomicInteger invocationCount = new AtomicInteger(0); + private final AtomicLong totalTime = new AtomicLong(0); + + /** + * Set Node State command handler. + * + * @param conf - Configuration for the datanode. + */ + public SetNodeOperationalStateCommandHandler(Configuration conf) { + this.conf = conf; + } + + /** + * Handles a given SCM command. + * + * @param command - SCM Command + * @param container - Ozone Container. + * @param context - Current Context. + * @param connectionManager - The SCMs that we are talking to. + */ + @Override + public void handle(SCMCommand command, OzoneContainer container, + StateContext context, SCMConnectionManager connectionManager) { + long startTime = Time.monotonicNow(); + invocationCount.incrementAndGet(); + StorageContainerDatanodeProtocolProtos.SetNodeOperationalStateCommandProto + setNodeCmdProto = null; + + if (command.getType() != Type.setNodeOperationalStateCommand) { + LOG.warn("Skipping handling command, expected command " + + "type {} but found {}", + Type.setNodeOperationalStateCommand, command.getType()); + return; + } + SetNodeOperationalStateCommand setNodeCmd = + (SetNodeOperationalStateCommand) command; + setNodeCmdProto = setNodeCmd.getProto(); + DatanodeDetails dni = context.getParent().getDatanodeDetails(); + dni.setPersistedOpState(setNodeCmdProto.getNodeOperationalState()); + dni.setPersistedOpStateExpiryEpochSec( + setNodeCmd.getStateExpiryEpochSeconds()); + try { + persistDatanodeDetails(dni); + } catch (IOException ioe) { + LOG.error("Failed to persist the datanode state", ioe); + // TODO - this should probably be raised, but it will break the command + // handler interface. + } + totalTime.addAndGet(Time.monotonicNow() - startTime); + } + + // TODO - this duplicates code in HddsDatanodeService and InitDatanodeState + // Need to refactor. + private void persistDatanodeDetails(DatanodeDetails dnDetails) + throws IOException { + String idFilePath = HddsServerUtil.getDatanodeIdFilePath(conf); + if (idFilePath == null || idFilePath.isEmpty()) { + LOG.error("A valid path is needed for config setting {}", + ScmConfigKeys.OZONE_SCM_DATANODE_ID_DIR); + throw new IllegalArgumentException( + ScmConfigKeys.OZONE_SCM_DATANODE_ID_DIR + + " must be defined. See" + + " https://wiki.apache.org/hadoop/Ozone#Configuration" + + " for details on configuring Ozone."); + } + + Preconditions.checkNotNull(idFilePath); + File idFile = new File(idFilePath); + ContainerUtils.writeDatanodeDetailsTo(dnDetails, idFile); + } + + /** + * Returns the command type that this command handler handles. + * + * @return Type + */ + @Override + public StorageContainerDatanodeProtocolProtos.SCMCommandProto.Type + getCommandType() { + return Type.setNodeOperationalStateCommand; + } + + /** + * Returns number of times this handler has been invoked. + * + * @return int + */ + @Override + public int getInvocationCount() { + return invocationCount.intValue(); + } + + /** + * Returns the average time this function takes to run. + * + * @return long + */ + @Override + public long getAverageRunTime() { + final int invocations = invocationCount.get(); + return invocations == 0 ? + 0 : totalTime.get() / invocations; + } +} \ No newline at end of file diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/states/endpoint/HeartbeatEndpointTask.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/states/endpoint/HeartbeatEndpointTask.java index a55d0d6..fa66a57 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/states/endpoint/HeartbeatEndpointTask.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/states/endpoint/HeartbeatEndpointTask.java @@ -52,6 +52,7 @@ import org.apache.hadoop.ozone.protocol.commands.DeleteBlocksCommand; import org.apache.hadoop.ozone.protocol.commands.DeleteContainerCommand; import org.apache.hadoop.ozone.protocol.commands.ReplicateContainerCommand; +import org.apache.hadoop.ozone.protocol.commands.SetNodeOperationalStateCommand; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -331,6 +332,17 @@ public class HeartbeatEndpointTask } this.context.addCommand(closePipelineCommand); break; + case setNodeOperationalStateCommand: + SetNodeOperationalStateCommand setNodeOperationalStateCommand = + SetNodeOperationalStateCommand.getFromProtobuf( + commandResponseProto.getSetNodeOperationalStateCommandProto()); + if (LOG.isDebugEnabled()) { + LOG.debug("Received SCM set operational state command. State: {} " + + "Expiry: {}", setNodeOperationalStateCommand.getOpState(), + setNodeOperationalStateCommand.getStateExpiryEpochSeconds()); + } + this.context.addCommand(setNodeOperationalStateCommand); + break; default: throw new IllegalArgumentException("Unknown response : " + commandResponseProto.getCommandType().name()); diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/protocol/commands/SetNodeOperationalStateCommand.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/protocol/commands/SetNodeOperationalStateCommand.java new file mode 100644 index 0000000..3ff7949 --- /dev/null +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/protocol/commands/SetNodeOperationalStateCommand.java @@ -0,0 +1,89 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ +package org.apache.hadoop.ozone.protocol.commands; + +import com.google.common.base.Preconditions; +import org.apache.hadoop.hdds.protocol.proto.HddsProtos; +import org.apache.hadoop.hdds.protocol.proto + .StorageContainerDatanodeProtocolProtos.SCMCommandProto; +import org.apache.hadoop.hdds.protocol.proto + .StorageContainerDatanodeProtocolProtos.SetNodeOperationalStateCommandProto; + +/** + * A command used to persist the current node operational state on the datanode. + */ +public class SetNodeOperationalStateCommand + extends SCMCommand<SetNodeOperationalStateCommandProto> { + + private final HddsProtos.NodeOperationalState opState; + private long stateExpiryEpochSeconds; + + /** + * Ctor that creates a SetNodeOperationalStateCommand. + * + * @param id - Command ID. Something a time stamp would suffice. + * @param state - OperationalState that want the node to be set into. + * @param stateExpiryEpochSeconds The epoch time when the state should + * expire, or zero for the state to remain + * indefinitely. + */ + public SetNodeOperationalStateCommand(long id, + HddsProtos.NodeOperationalState state, long stateExpiryEpochSeconds) { + super(id); + this.opState = state; + this.stateExpiryEpochSeconds = stateExpiryEpochSeconds; + } + + /** + * Returns the type of this command. + * + * @return Type - This is setNodeOperationalStateCommand. + */ + @Override + public SCMCommandProto.Type getType() { + return SCMCommandProto.Type.setNodeOperationalStateCommand; + } + + /** + * Gets the protobuf message of this object. + * + * @return A protobuf message. + */ + @Override + public SetNodeOperationalStateCommandProto getProto() { + return SetNodeOperationalStateCommandProto.newBuilder() + .setCmdId(getId()) + .setNodeOperationalState(opState) + .setStateExpiryEpochSeconds(stateExpiryEpochSeconds).build(); + } + + public HddsProtos.NodeOperationalState getOpState() { + return opState; + } + + public long getStateExpiryEpochSeconds() { + return stateExpiryEpochSeconds; + } + + public static SetNodeOperationalStateCommand getFromProtobuf( + SetNodeOperationalStateCommandProto cmdProto) { + Preconditions.checkNotNull(cmdProto); + return new SetNodeOperationalStateCommand(cmdProto.getCmdId(), + cmdProto.getNodeOperationalState(), + cmdProto.getStateExpiryEpochSeconds()); + } +} \ No newline at end of file diff --git a/hadoop-hdds/container-service/src/main/proto/StorageContainerDatanodeProtocol.proto b/hadoop-hdds/container-service/src/main/proto/StorageContainerDatanodeProtocol.proto index 171ea86..4a977e1 100644 --- a/hadoop-hdds/container-service/src/main/proto/StorageContainerDatanodeProtocol.proto +++ b/hadoop-hdds/container-service/src/main/proto/StorageContainerDatanodeProtocol.proto @@ -287,6 +287,7 @@ message SCMCommandProto { replicateContainerCommand = 5; createPipelineCommand = 6; closePipelineCommand = 7; + setNodeOperationalStateCommand = 8; } // TODO: once we start using protoc 3.x, refactor this message using "oneof" required Type commandType = 1; @@ -297,6 +298,7 @@ message SCMCommandProto { optional ReplicateContainerCommandProto replicateContainerCommandProto = 6; optional CreatePipelineCommandProto createPipelineCommandProto = 7; optional ClosePipelineCommandProto closePipelineCommandProto = 8; + optional SetNodeOperationalStateCommandProto setNodeOperationalStateCommandProto = 9; } /** @@ -384,6 +386,12 @@ message ClosePipelineCommandProto { required int64 cmdId = 2; } +message SetNodeOperationalStateCommandProto { + required int64 cmdId = 1; + required NodeOperationalState nodeOperationalState = 2; + required int64 stateExpiryEpochSeconds = 3; +} + /** * Protocol used from a datanode to StorageContainerManager. * diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/DatanodeAdminMonitorImpl.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/DatanodeAdminMonitorImpl.java index 4d2d895..a1d47fd 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/DatanodeAdminMonitorImpl.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/DatanodeAdminMonitorImpl.java @@ -357,7 +357,8 @@ public class DatanodeAdminMonitorImpl implements DatanodeAdminMonitor { private void setNodeOpState(DatanodeAdminNodeDetails dn, HddsProtos.NodeOperationalState state) throws NodeNotFoundException { - nodeManager.setNodeOperationalState(dn.getDatanodeDetails(), state); + nodeManager.setNodeOperationalState(dn.getDatanodeDetails(), state, + dn.getMaintenanceEnd() / 1000); } private NodeStatus getNodeStatus(DatanodeDetails dnd) diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/DatanodeAdminNodeDetails.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/DatanodeAdminNodeDetails.java index c23fcd2..9c8a905 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/DatanodeAdminNodeDetails.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/DatanodeAdminNodeDetails.java @@ -107,6 +107,14 @@ public class DatanodeAdminNodeDetails { } /** + * Returns the maintenance end time as milli seconds from the epoch. + * @return The maintenance end time, or zero if no end time is set. + */ + public long getMaintenanceEnd() { + return maintenanceEndTime; + } + + /** * Matches only on the DatanodeDetails field, which compares only the UUID * of the node to determine of they are the same object or not. * diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/NodeDecommissionManager.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/NodeDecommissionManager.java index 06fe270..3258fef 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/NodeDecommissionManager.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/NodeDecommissionManager.java @@ -314,9 +314,15 @@ public class NodeDecommissionManager { throws NodeNotFoundException, InvalidNodeStateException { NodeStatus nodeStatus = getNodeStatus(dn); NodeOperationalState opState = nodeStatus.getOperationalState(); + + long maintenanceEnd = 0; + if (endInHours != 0) { + maintenanceEnd = + (System.currentTimeMillis() / 1000L) + (endInHours * 60L * 60L); + } if (opState == NodeOperationalState.IN_SERVICE) { nodeManager.setNodeOperationalState( - dn, NodeOperationalState.ENTERING_MAINTENANCE); + dn, NodeOperationalState.ENTERING_MAINTENANCE, maintenanceEnd); monitor.startMonitoring(dn, endInHours); LOG.info("Starting Maintenance for node {}", dn); } else if (nodeStatus.isMaintenance()) { diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/NodeManager.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/NodeManager.java index b595d00..44ab581 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/NodeManager.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/NodeManager.java @@ -142,6 +142,18 @@ public interface NodeManager extends StorageContainerNodeProtocol, NodeOperationalState newState) throws NodeNotFoundException; /** + * Set the operation state of a node. + * @param datanodeDetails The datanode to set the new state for + * @param newState The new operational state for the node + * @param opStateExpiryEpocSec Seconds from the epoch when the operational + * state should end. Zero indicates the state + * never end. + */ + void setNodeOperationalState(DatanodeDetails datanodeDetails, + NodeOperationalState newState, + long opStateExpiryEpocSec) throws NodeNotFoundException; + + /** * Get set of pipelines a datanode is part of. * @param datanodeDetails DatanodeDetails * @return Set of PipelineID diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/NodeStateManager.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/NodeStateManager.java index b16e79c..f18ac60 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/NodeStateManager.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/NodeStateManager.java @@ -358,10 +358,28 @@ public class NodeStateManager implements Runnable, Closeable { */ public void setNodeOperationalState(DatanodeDetails dn, NodeOperationalState newState) throws NodeNotFoundException { + setNodeOperationalState(dn, newState, 0); + } + + /** + * Sets the operational state of the given node. Intended to be called when + * a node is being decommissioned etc. + * + * @param dn The datanode having its state set + * @param newState The new operational State of the node. + * @param stateExpiryEpochSec The number of seconds from the epoch when the + * operational state should expire. Passing zero + * indicates the state will never expire + */ + public void setNodeOperationalState(DatanodeDetails dn, + NodeOperationalState newState, + long stateExpiryEpochSec) throws NodeNotFoundException { DatanodeInfo dni = nodeStateMap.getNodeInfo(dn.getUuid()); NodeStatus oldStatus = dni.getNodeStatus(); - if (oldStatus.getOperationalState() != newState) { - nodeStateMap.updateNodeOperationalState(dn.getUuid(), newState); + if (oldStatus.getOperationalState() != newState || + oldStatus.getOpStateExpiryEpochSeconds() != stateExpiryEpochSec) { + nodeStateMap.updateNodeOperationalState( + dn.getUuid(), newState, stateExpiryEpochSec); // This will trigger an event based on the nodes health when the // operational state changes. Eg a node that was IN_MAINTENANCE goes // to IN_SERVICE + HEALTHY. This will trigger the HEALTHY node event to @@ -370,7 +388,9 @@ public class NodeStateManager implements Runnable, Closeable { // container replicas. Sometimes the event will do nothing, but it will // not do any harm either. Eg DECOMMISSIONING -> DECOMMISSIONED + HEALTHY // but the pipeline creation logic will ignore decommissioning nodes. - fireHealthStateEvent(oldStatus.getHealth(), dn); + if (oldStatus.getOperationalState() != newState) { + fireHealthStateEvent(oldStatus.getHealth(), dn); + } } } diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/NodeStatus.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/NodeStatus.java index b8a92e5..72ca015 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/NodeStatus.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/NodeStatus.java @@ -25,17 +25,28 @@ import java.util.Objects; /** * This class is used to capture the current status of a datanode. This * includes its health (healthy, stale or dead) and its operation status ( - * in_service, decommissioned and maintenance mode. + * in_service, decommissioned and maintenance mode) along with the expiry time + * for the operational state (used with maintenance mode). */ public class NodeStatus { private HddsProtos.NodeOperationalState operationalState; private HddsProtos.NodeState health; + private long opStateExpiryEpochSeconds; public NodeStatus(HddsProtos.NodeOperationalState operationalState, HddsProtos.NodeState health) { this.operationalState = operationalState; this.health = health; + this.opStateExpiryEpochSeconds = 0; + } + + public NodeStatus(HddsProtos.NodeOperationalState operationalState, + HddsProtos.NodeState health, + long opStateExpireEpocSeconds) { + this.operationalState = operationalState; + this.health = health; + this.opStateExpiryEpochSeconds = opStateExpireEpocSeconds; } public static NodeStatus inServiceHealthy() { @@ -61,6 +72,17 @@ public class NodeStatus { return operationalState; } + public long getOpStateExpiryEpochSeconds() { + return opStateExpiryEpochSeconds; + } + + public boolean operationalStateExpired() { + if (0 == opStateExpiryEpochSeconds) { + return false; + } + return System.currentTimeMillis() / 1000 >= opStateExpiryEpochSeconds; + } + /** * Returns true if the nodeStatus indicates the node is in any decommission * state. @@ -163,7 +185,8 @@ public class NodeStatus { } NodeStatus other = (NodeStatus) obj; if (this.operationalState == other.operationalState && - this.health == other.health) { + this.health == other.health + && this.opStateExpiryEpochSeconds == other.opStateExpiryEpochSeconds) { return true; } return false; @@ -171,12 +194,13 @@ public class NodeStatus { @Override public int hashCode() { - return Objects.hash(health, operationalState); + return Objects.hash(health, operationalState, opStateExpiryEpochSeconds); } @Override public String toString() { - return "OperationalState: "+operationalState+" Health: "+health; + return "OperationalState: "+operationalState+" Health: "+health+ + " OperastionStateExpiry: "+opStateExpiryEpochSeconds; } } diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/SCMNodeManager.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/SCMNodeManager.java index d89dac1..3f17e32 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/SCMNodeManager.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/SCMNodeManager.java @@ -34,6 +34,7 @@ import java.util.stream.Collectors; import edu.umd.cs.findbugs.annotations.SuppressFBWarnings; import org.apache.hadoop.hdds.conf.OzoneConfiguration; import org.apache.hadoop.hdds.protocol.DatanodeDetails; +import org.apache.hadoop.hdds.protocol.proto.HddsProtos; import org.apache.hadoop.hdds.protocol.proto.HddsProtos.NodeState; import org.apache.hadoop.hdds.protocol.proto.HddsProtos.NodeOperationalState; import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos; @@ -64,11 +65,13 @@ import org.apache.hadoop.ozone.protocol.VersionResponse; import org.apache.hadoop.ozone.protocol.commands.CommandForDatanode; import org.apache.hadoop.ozone.protocol.commands.RegisteredCommand; import org.apache.hadoop.ozone.protocol.commands.SCMCommand; +import org.apache.hadoop.ozone.protocol.commands.SetNodeOperationalStateCommand; import org.apache.hadoop.util.ReflectionUtils; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; import com.google.common.base.Strings; +import org.apache.hadoop.util.Time; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -231,7 +234,23 @@ public class SCMNodeManager implements NodeManager { @Override public void setNodeOperationalState(DatanodeDetails datanodeDetails, NodeOperationalState newState) throws NodeNotFoundException{ - nodeStateManager.setNodeOperationalState(datanodeDetails, newState); + setNodeOperationalState(datanodeDetails, newState, 0); + } + + /** + * Set the operation state of a node. + * @param datanodeDetails The datanode to set the new state for + * @param newState The new operational state for the node + * @param opStateExpiryEpocSec Seconds from the epoch when the operational + * state should end. Zero indicates the state + * never end. + */ + @Override + public void setNodeOperationalState(DatanodeDetails datanodeDetails, + NodeOperationalState newState, long opStateExpiryEpocSec) + throws NodeNotFoundException{ + nodeStateManager.setNodeOperationalState( + datanodeDetails, newState, opStateExpiryEpocSec); } /** @@ -312,6 +331,7 @@ public class SCMNodeManager implements NodeManager { datanodeDetails.toString()); } } + registerInitialDatanodeOpState(datanodeDetails); return RegisteredCommand.newBuilder().setErrorCode(ErrorCode.success) .setDatanode(datanodeDetails) @@ -320,6 +340,29 @@ public class SCMNodeManager implements NodeManager { } /** + * When a node registers with SCM, the operational state stored on the + * datanode is the source of truth. Therefore, if the datanode reports + * anything other than IN_SERVICE on registration, the state in SCM should be + * updated to reflect the datanode state. + * @param dn + */ + private void registerInitialDatanodeOpState(DatanodeDetails dn) { + try { + HddsProtos.NodeOperationalState dnOpState = dn.getPersistedOpState(); + if (dnOpState != NodeOperationalState.IN_SERVICE) { + LOG.info("Updating nodeOperationalState on registration as the " + + "datanode has a persisted state of {} and expiry of {}", + dnOpState, dn.getPersistedOpStateExpiryEpochSec()); + setNodeOperationalState(dn, dnOpState, + dn.getPersistedOpStateExpiryEpochSec()); + } + } catch (NodeNotFoundException e) { + LOG.error("Unable to find the node when setting the operational state", + e); + } + } + + /** * Add an entry to the dnsToUuidMap, which maps hostname / IP to the DNs * running on that host. As each address can have many DNs running on it, * this is a one to many mapping. @@ -352,6 +395,7 @@ public class SCMNodeManager implements NodeManager { try { nodeStateManager.updateLastHeartbeatTime(datanodeDetails); metrics.incNumHBProcessed(); + updateDatanodeOpState(datanodeDetails); } catch (NodeNotFoundException e) { metrics.incNumHBProcessingFailed(); LOG.error("SCM trying to process heartbeat from an " + @@ -360,6 +404,41 @@ public class SCMNodeManager implements NodeManager { return commandQueue.getCommand(datanodeDetails.getUuid()); } + /** + * If the operational state or expiry reported in the datanode heartbeat do + * not match those store in SCM, queue a command to update the state persisted + * on the datanode. Additionally, ensure the datanodeDetails stored in SCM + * match those reported in the heartbeat. + * This method should only be called when processing the + * heartbeat, and for a registered node, the information stored in SCM is the + * source of truth. + * @param reportedDn The DatanodeDetails taken from the node heartbeat. + * @throws NodeNotFoundException + */ + private void updateDatanodeOpState(DatanodeDetails reportedDn) + throws NodeNotFoundException { + NodeStatus scmStatus = getNodeStatus(reportedDn); + if (scmStatus.getOperationalState() != reportedDn.getPersistedOpState() + || scmStatus.getOpStateExpiryEpochSeconds() + != reportedDn.getPersistedOpStateExpiryEpochSec()) { + LOG.info("Scheduling a command to update the operationalState " + + "persisted on the datanode as the reported value ({}, {}) does not " + + "match the value stored in SCM ({}, {})", + reportedDn.getPersistedOpState(), + reportedDn.getPersistedOpStateExpiryEpochSec(), + scmStatus.getOperationalState(), + scmStatus.getOpStateExpiryEpochSeconds()); + commandQueue.addCommand(reportedDn.getUuid(), + new SetNodeOperationalStateCommand( + Time.monotonicNow(), scmStatus.getOperationalState(), + scmStatus.getOpStateExpiryEpochSeconds())); + } + DatanodeDetails scmDnd = nodeStateManager.getNode(reportedDn); + scmDnd.setPersistedOpStateExpiryEpochSec( + reportedDn.getPersistedOpStateExpiryEpochSec()); + scmDnd.setPersistedOpState(reportedDn.getPersistedOpState()); + } + @Override public Boolean isNodeRegistered(DatanodeDetails datanodeDetails) { try { diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/states/NodeStateMap.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/states/NodeStateMap.java index 6565e81..3cf232a 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/states/NodeStateMap.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/states/NodeStateMap.java @@ -113,13 +113,14 @@ public class NodeStateMap { * @throws NodeNotFoundException if the node is not present */ public NodeStatus updateNodeOperationalState(UUID nodeId, - NodeOperationalState newOpState) throws NodeNotFoundException { + NodeOperationalState newOpState, long opStateExpiryEpochSeconds) + throws NodeNotFoundException { try { lock.writeLock().lock(); DatanodeInfo dn = getNodeInfo(nodeId); NodeStatus oldStatus = dn.getNodeStatus(); NodeStatus newStatus = new NodeStatus( - newOpState, oldStatus.getHealth()); + newOpState, oldStatus.getHealth(), opStateExpiryEpochSeconds); dn.setNodeStatus(newStatus); return newStatus; } finally { diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMDatanodeProtocolServer.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMDatanodeProtocolServer.java index d569044..98a25d2 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMDatanodeProtocolServer.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMDatanodeProtocolServer.java @@ -68,6 +68,7 @@ import org.apache.hadoop.ozone.protocol.commands.DeleteContainerCommand; import org.apache.hadoop.ozone.protocol.commands.RegisteredCommand; import org.apache.hadoop.ozone.protocol.commands.ReplicateContainerCommand; import org.apache.hadoop.ozone.protocol.commands.SCMCommand; +import org.apache.hadoop.ozone.protocol.commands.SetNodeOperationalStateCommand; import org.apache.hadoop.ozone.protocolPB.ProtocolMessageMetrics; import org.apache.hadoop.ozone.protocolPB.StorageContainerDatanodeProtocolPB; import org.apache.hadoop.ozone.protocolPB.StorageContainerDatanodeProtocolServerSideTranslatorPB; @@ -87,6 +88,7 @@ import static org.apache.hadoop.hdds.protocol.proto import static org.apache.hadoop.hdds.protocol.proto .StorageContainerDatanodeProtocolProtos.SCMCommandProto.Type .closePipelineCommand; +import static org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.SCMCommandProto.Type.setNodeOperationalStateCommand; import static org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_DATANODE_ADDRESS_KEY; import static org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_HANDLER_COUNT_DEFAULT; import static org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_HANDLER_COUNT_KEY; @@ -350,6 +352,12 @@ public class SCMDatanodeProtocolServer implements .setClosePipelineCommandProto( ((ClosePipelineCommand)cmd).getProto()) .build(); + case setNodeOperationalStateCommand: + return builder + .setCommandType(setNodeOperationalStateCommand) + .setSetNodeOperationalStateCommandProto( + ((SetNodeOperationalStateCommand)cmd).getProto()) + .build(); default: throw new IllegalArgumentException("Scm command " + cmd.getType().toString() + " is not implemented"); diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/MockNodeManager.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/MockNodeManager.java index d24cbfd..da04b71 100644 --- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/MockNodeManager.java +++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/MockNodeManager.java @@ -279,6 +279,16 @@ public class MockNodeManager implements NodeManager { } /** + * Set the operation state of a node. + * @param datanodeDetails The datanode to set the new state for + * @param newState The new operational state for the node + */ + public void setNodeOperationalState(DatanodeDetails datanodeDetails, + HddsProtos.NodeOperationalState newState, long opStateExpiryEpocSec) + throws NodeNotFoundException { + } + + /** * Get set of pipelines a datanode is part of. * @param dnId - datanodeID * @return Set of PipelineID diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/SimpleMockNodeManager.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/SimpleMockNodeManager.java index dad3448..6982d03 100644 --- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/SimpleMockNodeManager.java +++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/SimpleMockNodeManager.java @@ -100,12 +100,20 @@ public class SimpleMockNodeManager implements NodeManager { @Override public void setNodeOperationalState(DatanodeDetails dn, HddsProtos.NodeOperationalState newState) throws NodeNotFoundException { + setNodeOperationalState(dn, newState, 0); + } + + @Override + public void setNodeOperationalState(DatanodeDetails dn, + HddsProtos.NodeOperationalState newState, long opStateExpiryEpocSec) + throws NodeNotFoundException { DatanodeInfo dni = nodeMap.get(dn.getUuid()); if (dni == null) { throw new NodeNotFoundException(); } dni.setNodeStatus( - new NodeStatus(newState, dni.getNodeStatus().getHealth())); + new NodeStatus( + newState, dni.getNodeStatus().getHealth(), opStateExpiryEpocSec)); } /** diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestNodeDecommissionManager.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestNodeDecommissionManager.java index df62438..a8bcf54 100644 --- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestNodeDecommissionManager.java +++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestNodeDecommissionManager.java @@ -36,6 +36,7 @@ import java.util.Arrays; import java.util.ArrayList; import static junit.framework.TestCase.assertEquals; import static org.assertj.core.api.Fail.fail; +import static org.junit.Assert.assertNotEquals; /** * Unit tests for the decommision manager. @@ -182,8 +183,12 @@ public class TestNodeDecommissionManager { dns.get(2).getIpAddress()), 100); assertEquals(HddsProtos.NodeOperationalState.ENTERING_MAINTENANCE, nodeManager.getNodeStatus(dns.get(1)).getOperationalState()); + assertNotEquals(0, nodeManager.getNodeStatus( + dns.get(1)).getOpStateExpiryEpochSeconds()); assertEquals(HddsProtos.NodeOperationalState.ENTERING_MAINTENANCE, nodeManager.getNodeStatus(dns.get(2)).getOperationalState()); + assertNotEquals(0, nodeManager.getNodeStatus( + dns.get(2)).getOpStateExpiryEpochSeconds()); // Running the command again gives no error - nodes already decommissioning // are silently ignored. diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestSCMNodeManager.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestSCMNodeManager.java index 0917fa4..b78a76f 100644 --- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestSCMNodeManager.java +++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestSCMNodeManager.java @@ -38,6 +38,7 @@ import org.apache.hadoop.hdfs.DFSConfigKeys; import org.apache.hadoop.ozone.protocol.commands.CloseContainerCommand; import org.apache.hadoop.ozone.protocol.commands.CommandForDatanode; import org.apache.hadoop.ozone.protocol.commands.SCMCommand; +import org.apache.hadoop.ozone.protocol.commands.SetNodeOperationalStateCommand; import org.apache.hadoop.security.authentication.client.AuthenticationException; import org.apache.hadoop.test.GenericTestUtils; import org.apache.hadoop.test.PathUtils; @@ -275,6 +276,35 @@ public class TestSCMNodeManager { } /** + * Ensure that a change to the operationalState of a node fires a datanode + * event of type SetNodeOperationalStateCommand. + */ + @Test + @Ignore // TODO - this test is no longer valid as the heartbeat processing + // now generates the command message. + public void testSetNodeOpStateAndCommandFired() + throws IOException, NodeNotFoundException, AuthenticationException { + final int interval = 100; + + OzoneConfiguration conf = getConf(); + conf.setTimeDuration(OZONE_SCM_HEARTBEAT_PROCESS_INTERVAL, interval, + MILLISECONDS); + + try (SCMNodeManager nodeManager = createNodeManager(conf)) { + DatanodeDetails dn = TestUtils.createRandomDatanodeAndRegister( + nodeManager); + long expiry = System.currentTimeMillis() / 1000 + 1000; + nodeManager.setNodeOperationalState(dn, + HddsProtos.NodeOperationalState.ENTERING_MAINTENANCE, expiry); + List<SCMCommand> commands = nodeManager.getCommandQueue(dn.getUuid()); + + Assert.assertTrue(commands.get(0).getClass().equals( + SetNodeOperationalStateCommand.class)); + assertEquals(1, commands.size()); + } + } + + /** * Asserts that a single node moves from Healthy to stale node, then from * stale node to dead node if it misses enough heartbeats. * diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/states/TestNodeStateMap.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/states/TestNodeStateMap.java index 482f444..ad7139f 100644 --- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/states/TestNodeStateMap.java +++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/states/TestNodeStateMap.java @@ -82,11 +82,12 @@ public class TestNodeStateMap { NodeStatus expectedStatus = new NodeStatus( NodeOperationalState.DECOMMISSIONING, - NodeState.HEALTHY); + NodeState.HEALTHY, 999); NodeStatus returnedStatus = map.updateNodeOperationalState( - dn.getUuid(), expectedStatus.getOperationalState()); + dn.getUuid(), expectedStatus.getOperationalState(), 999); assertEquals(expectedStatus, returnedStatus); assertEquals(returnedStatus, map.getNodeStatus(dn.getUuid())); + assertEquals(999, returnedStatus.getOpStateExpiryEpochSeconds()); } @Test diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/ozone/container/testutils/ReplicationNodeManagerMock.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/ozone/container/testutils/ReplicationNodeManagerMock.java index ee96565..9654715 100644 --- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/ozone/container/testutils/ReplicationNodeManagerMock.java +++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/ozone/container/testutils/ReplicationNodeManagerMock.java @@ -191,9 +191,22 @@ public class ReplicationNodeManagerMock implements NodeManager { @Override public void setNodeOperationalState(DatanodeDetails dd, HddsProtos.NodeOperationalState newState) throws NodeNotFoundException { + setNodeOperationalState(dd, newState, 0); + } + + /** + * Set the operation state of a node. + * @param dd The datanode to set the new state for + * @param newState The new operational state for the node + */ + @Override + public void setNodeOperationalState(DatanodeDetails dd, + HddsProtos.NodeOperationalState newState, long opStateExpiryEpocSec) + throws NodeNotFoundException { NodeStatus currentStatus = nodeStateMap.get(dd); if (currentStatus != null) { - nodeStateMap.put(dd, new NodeStatus(newState, currentStatus.getHealth())); + nodeStateMap.put(dd, new NodeStatus(newState, currentStatus.getHealth(), + opStateExpiryEpocSec)); } else { throw new NodeNotFoundException(); } --------------------------------------------------------------------- To unsubscribe, e-mail: ozone-commits-unsubscr...@hadoop.apache.org For additional commands, e-mail: ozone-commits-h...@hadoop.apache.org