HDDS-212. Introduce NodeStateManager to manage the state of Datanodes in SCM. Contributed by Nanda kumar.
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/71df8c27 Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/71df8c27 Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/71df8c27 Branch: refs/heads/HDFS-12090 Commit: 71df8c27c9a0e326232d3baf16414a63b5ea5a4b Parents: 3b63715 Author: Nanda kumar <[email protected]> Authored: Thu Jul 5 02:11:10 2018 +0530 Committer: Nanda kumar <[email protected]> Committed: Thu Jul 5 02:11:10 2018 +0530 ---------------------------------------------------------------------- .../scm/client/ContainerOperationClient.java | 8 +- .../hadoop/hdds/protocol/DatanodeDetails.java | 13 +- .../apache/hadoop/hdds/scm/ScmConfigKeys.java | 4 - .../hadoop/hdds/scm/client/ScmClient.java | 5 +- .../StorageContainerLocationProtocol.java | 5 +- ...rLocationProtocolClientSideTranslatorPB.java | 8 +- ...rLocationProtocolServerSideTranslatorPB.java | 8 +- .../StorageContainerLocationProtocol.proto | 19 +- hadoop-hdds/common/src/main/proto/hdds.proto | 13 +- .../common/src/main/resources/ozone-default.xml | 11 - .../apache/hadoop/hdds/scm/HddsServerUtil.java | 11 - .../protocol/StorageContainerNodeProtocol.java | 4 +- .../hadoop/hdds/scm/node/DatanodeInfo.java | 109 ++++ .../hdds/scm/node/HeartbeatQueueItem.java | 98 ---- .../hadoop/hdds/scm/node/NodeManager.java | 16 +- .../hadoop/hdds/scm/node/NodeStateManager.java | 575 +++++++++++++++++++ .../hadoop/hdds/scm/node/SCMNodeManager.java | 506 ++-------------- .../node/states/NodeAlreadyExistsException.java | 45 ++ .../hdds/scm/node/states/NodeException.java | 44 ++ .../scm/node/states/NodeNotFoundException.java | 49 ++ .../hdds/scm/node/states/NodeStateMap.java | 281 +++++++++ .../scm/server/SCMClientProtocolServer.java | 60 +- .../server/SCMDatanodeHeartbeatDispatcher.java | 2 +- .../scm/server/SCMDatanodeProtocolServer.java | 2 +- .../hdds/scm/container/MockNodeManager.java | 58 +- .../hdds/scm/node/TestContainerPlacement.java | 10 +- .../hadoop/hdds/scm/node/TestNodeManager.java | 176 ++---- .../testutils/ReplicationNodeManagerMock.java | 37 +- .../ozone/TestStorageContainerManager.java | 4 +- .../hadoop/ozone/scm/node/TestQueryNode.java | 19 +- .../hadoop/ozone/ksm/KeySpaceManager.java | 6 +- 31 files changed, 1288 insertions(+), 918 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hadoop/blob/71df8c27/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/client/ContainerOperationClient.java ---------------------------------------------------------------------- diff --git a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/client/ContainerOperationClient.java b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/client/ContainerOperationClient.java index b04f8c4..e7bdaf0 100644 --- a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/client/ContainerOperationClient.java +++ b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/client/ContainerOperationClient.java @@ -37,7 +37,6 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.IOException; -import java.util.EnumSet; import java.util.List; import java.util.UUID; @@ -234,14 +233,14 @@ public class ContainerOperationClient implements ScmClient { /** * Returns a set of Nodes that meet a query criteria. * - * @param nodeStatuses - A set of criteria that we want the node to have. + * @param nodeStatuses - Criteria that we want the node to have. * @param queryScope - Query scope - Cluster or pool. * @param poolName - if it is pool, a pool name is required. * @return A set of nodes that meet the requested criteria. * @throws IOException */ @Override - public HddsProtos.NodePool queryNode(EnumSet<HddsProtos.NodeState> + public List<HddsProtos.Node> queryNode(HddsProtos.NodeState nodeStatuses, HddsProtos.QueryScope queryScope, String poolName) throws IOException { return storageContainerLocationClient.queryNode(nodeStatuses, queryScope, @@ -458,7 +457,8 @@ public class ContainerOperationClient implements ScmClient { */ @Override public long getContainerSize(long containerID) throws IOException { - // TODO : Fix this, it currently returns the capacity but not the current usage. + // TODO : Fix this, it currently returns the capacity + // but not the current usage. long size = getContainerSizeB(); if (size == -1) { throw new IOException("Container size unknown!"); http://git-wip-us.apache.org/repos/asf/hadoop/blob/71df8c27/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/protocol/DatanodeDetails.java ---------------------------------------------------------------------- diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/protocol/DatanodeDetails.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/protocol/DatanodeDetails.java index c373e22..bae22a2 100644 --- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/protocol/DatanodeDetails.java +++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/protocol/DatanodeDetails.java @@ -35,7 +35,7 @@ import java.util.UUID; */ @InterfaceAudience.Private @InterfaceStability.Evolving -public final class DatanodeDetails implements Comparable<DatanodeDetails> { +public class DatanodeDetails implements Comparable<DatanodeDetails> { /** * DataNode's unique identifier in the cluster. @@ -63,6 +63,13 @@ public final class DatanodeDetails implements Comparable<DatanodeDetails> { this.ports = ports; } + protected DatanodeDetails(DatanodeDetails datanodeDetails) { + this.uuid = datanodeDetails.uuid; + this.ipAddress = datanodeDetails.ipAddress; + this.hostName = datanodeDetails.hostName; + this.ports = datanodeDetails.ports; + } + /** * Returns the DataNode UUID. * @@ -238,7 +245,7 @@ public final class DatanodeDetails implements Comparable<DatanodeDetails> { /** * Builder class for building DatanodeDetails. */ - public static class Builder { + public static final class Builder { private String id; private String ipAddress; private String hostName; @@ -324,7 +331,7 @@ public final class DatanodeDetails implements Comparable<DatanodeDetails> { /** * Container to hold DataNode Port details. */ - public static class Port { + public static final class Port { /** * Ports that are supported in DataNode. http://git-wip-us.apache.org/repos/asf/hadoop/blob/71df8c27/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/ScmConfigKeys.java ---------------------------------------------------------------------- diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/ScmConfigKeys.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/ScmConfigKeys.java index df6fbf0..ad326dc 100644 --- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/ScmConfigKeys.java +++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/ScmConfigKeys.java @@ -165,10 +165,6 @@ public final class ScmConfigKeys { public static final String OZONE_SCM_DEADNODE_INTERVAL_DEFAULT = "10m"; - public static final String OZONE_SCM_MAX_HB_COUNT_TO_PROCESS = - "ozone.scm.max.hb.count.to.process"; - public static final int OZONE_SCM_MAX_HB_COUNT_TO_PROCESS_DEFAULT = 5000; - public static final String OZONE_SCM_HEARTBEAT_PROCESS_INTERVAL = "ozone.scm.heartbeat.thread.interval"; public static final String OZONE_SCM_HEARTBEAT_PROCESS_INTERVAL_DEFAULT = http://git-wip-us.apache.org/repos/asf/hadoop/blob/71df8c27/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/client/ScmClient.java ---------------------------------------------------------------------- diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/client/ScmClient.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/client/ScmClient.java index ecb2173..7955179 100644 --- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/client/ScmClient.java +++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/client/ScmClient.java @@ -26,7 +26,6 @@ import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos import org.apache.hadoop.hdds.protocol.proto.HddsProtos; import java.io.IOException; -import java.util.EnumSet; import java.util.List; /** @@ -150,13 +149,13 @@ public interface ScmClient { /** * Returns a set of Nodes that meet a query criteria. - * @param nodeStatuses - A set of criteria that we want the node to have. + * @param nodeStatuses - Criteria that we want the node to have. * @param queryScope - Query scope - Cluster or pool. * @param poolName - if it is pool, a pool name is required. * @return A set of nodes that meet the requested criteria. * @throws IOException */ - HddsProtos.NodePool queryNode(EnumSet<HddsProtos.NodeState> nodeStatuses, + List<HddsProtos.Node> queryNode(HddsProtos.NodeState nodeStatuses, HddsProtos.QueryScope queryScope, String poolName) throws IOException; /** http://git-wip-us.apache.org/repos/asf/hadoop/blob/71df8c27/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/protocol/StorageContainerLocationProtocol.java ---------------------------------------------------------------------- diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/protocol/StorageContainerLocationProtocol.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/protocol/StorageContainerLocationProtocol.java index b787409..581fbd0 100644 --- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/protocol/StorageContainerLocationProtocol.java +++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/protocol/StorageContainerLocationProtocol.java @@ -26,7 +26,6 @@ import org.apache.hadoop.hdds.protocol.proto .StorageContainerLocationProtocolProtos.ObjectStageChangeRequestProto; import java.io.IOException; -import java.util.EnumSet; import java.util.List; /** @@ -94,10 +93,10 @@ public interface StorageContainerLocationProtocol { /** * Queries a list of Node Statuses. - * @param nodeStatuses + * @param state * @return List of Datanodes. */ - HddsProtos.NodePool queryNode(EnumSet<HddsProtos.NodeState> nodeStatuses, + List<HddsProtos.Node> queryNode(HddsProtos.NodeState state, HddsProtos.QueryScope queryScope, String poolName) throws IOException; /** http://git-wip-us.apache.org/repos/asf/hadoop/blob/71df8c27/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/protocolPB/StorageContainerLocationProtocolClientSideTranslatorPB.java ---------------------------------------------------------------------- diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/protocolPB/StorageContainerLocationProtocolClientSideTranslatorPB.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/protocolPB/StorageContainerLocationProtocolClientSideTranslatorPB.java index 4b03d12..ac12ea2 100644 --- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/protocolPB/StorageContainerLocationProtocolClientSideTranslatorPB.java +++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/protocolPB/StorageContainerLocationProtocolClientSideTranslatorPB.java @@ -59,7 +59,6 @@ import org.apache.hadoop.ipc.RPC; import java.io.Closeable; import java.io.IOException; import java.util.ArrayList; -import java.util.EnumSet; import java.util.List; /** @@ -215,20 +214,19 @@ public final class StorageContainerLocationProtocolClientSideTranslatorPB * @return List of Datanodes. */ @Override - public HddsProtos.NodePool queryNode(EnumSet<HddsProtos.NodeState> + public List<HddsProtos.Node> queryNode(HddsProtos.NodeState nodeStatuses, HddsProtos.QueryScope queryScope, String poolName) throws IOException { // TODO : We support only cluster wide query right now. So ignoring checking // queryScope and poolName Preconditions.checkNotNull(nodeStatuses); - Preconditions.checkState(nodeStatuses.size() > 0); NodeQueryRequestProto request = NodeQueryRequestProto.newBuilder() - .addAllQuery(nodeStatuses) + .setState(nodeStatuses) .setScope(queryScope).setPoolName(poolName).build(); try { NodeQueryResponseProto response = rpcProxy.queryNode(NULL_RPC_CONTROLLER, request); - return response.getDatanodes(); + return response.getDatanodesList(); } catch (ServiceException e) { throw ProtobufHelper.getRemoteException(e); } http://git-wip-us.apache.org/repos/asf/hadoop/blob/71df8c27/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/protocolPB/StorageContainerLocationProtocolServerSideTranslatorPB.java ---------------------------------------------------------------------- diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/protocolPB/StorageContainerLocationProtocolServerSideTranslatorPB.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/protocolPB/StorageContainerLocationProtocolServerSideTranslatorPB.java index d66919f..9175ebf 100644 --- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/protocolPB/StorageContainerLocationProtocolServerSideTranslatorPB.java +++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/protocolPB/StorageContainerLocationProtocolServerSideTranslatorPB.java @@ -57,7 +57,6 @@ import org.apache.hadoop.hdds.protocol.proto .StorageContainerLocationProtocolProtos.SCMListContainerResponseProto; import java.io.IOException; -import java.util.EnumSet; import java.util.List; /** @@ -171,13 +170,12 @@ public final class StorageContainerLocationProtocolServerSideTranslatorPB StorageContainerLocationProtocolProtos.NodeQueryRequestProto request) throws ServiceException { try { - EnumSet<HddsProtos.NodeState> nodeStateEnumSet = EnumSet.copyOf(request - .getQueryList()); - HddsProtos.NodePool datanodes = impl.queryNode(nodeStateEnumSet, + HddsProtos.NodeState nodeState = request.getState(); + List<HddsProtos.Node> datanodes = impl.queryNode(nodeState, request.getScope(), request.getPoolName()); return StorageContainerLocationProtocolProtos .NodeQueryResponseProto.newBuilder() - .setDatanodes(datanodes) + .addAllDatanodes(datanodes) .build(); } catch (Exception e) { throw new ServiceException(e); http://git-wip-us.apache.org/repos/asf/hadoop/blob/71df8c27/hadoop-hdds/common/src/main/proto/StorageContainerLocationProtocol.proto ---------------------------------------------------------------------- diff --git a/hadoop-hdds/common/src/main/proto/StorageContainerLocationProtocol.proto b/hadoop-hdds/common/src/main/proto/StorageContainerLocationProtocol.proto index 143c2ae..68cc35f 100644 --- a/hadoop-hdds/common/src/main/proto/StorageContainerLocationProtocol.proto +++ b/hadoop-hdds/common/src/main/proto/StorageContainerLocationProtocol.proto @@ -118,26 +118,13 @@ message ObjectStageChangeResponseProto { match the NodeState that we are requesting. */ message NodeQueryRequestProto { - - - // Repeated, So we can specify more than one status type. - // These NodeState types are additive for now, in the sense that - // if you specify HEALTHY and FREE_NODE members -- - // Then you get all healthy node which are not raft members. - // - // if you specify all healthy and dead nodes, you will get nothing - // back. Server is not going to dictate what combinations make sense, - // it is entirely up to the caller. - // TODO: Support operators like OR and NOT. Currently it is always an - // implied AND. - - repeated NodeState query = 1; + required NodeState state = 1; required QueryScope scope = 2; optional string poolName = 3; // if scope is pool, then pool name is needed. } message NodeQueryResponseProto { - required NodePool datanodes = 1; + repeated Node datanodes = 1; } /** @@ -194,7 +181,7 @@ service StorageContainerLocationProtocolService { /** * Returns a set of Nodes that meet a criteria. */ - rpc queryNode(NodeQueryRequestProto) returns (NodeQueryResponseProto); + rpc queryNode(NodeQueryRequestProto) returns (NodeQueryResponseProto); /** * Notify from client when begin or finish container or pipeline operations on datanodes. http://git-wip-us.apache.org/repos/asf/hadoop/blob/71df8c27/hadoop-hdds/common/src/main/proto/hdds.proto ---------------------------------------------------------------------- diff --git a/hadoop-hdds/common/src/main/proto/hdds.proto b/hadoop-hdds/common/src/main/proto/hdds.proto index 1c9ee19..b9def2a 100644 --- a/hadoop-hdds/common/src/main/proto/hdds.proto +++ b/hadoop-hdds/common/src/main/proto/hdds.proto @@ -69,14 +69,11 @@ enum NodeType { * and getNodeCount. */ enum NodeState { - HEALTHY = 1; - STALE = 2; - DEAD = 3; - DECOMMISSIONING = 4; - DECOMMISSIONED = 5; - RAFT_MEMBER = 6; - FREE_NODE = 7; // Not a member in raft. - INVALID = 8; + HEALTHY = 1; + STALE = 2; + DEAD = 3; + DECOMMISSIONING = 4; + DECOMMISSIONED = 5; } enum QueryScope { http://git-wip-us.apache.org/repos/asf/hadoop/blob/71df8c27/hadoop-hdds/common/src/main/resources/ozone-default.xml ---------------------------------------------------------------------- diff --git a/hadoop-hdds/common/src/main/resources/ozone-default.xml b/hadoop-hdds/common/src/main/resources/ozone-default.xml index 25365c8..568d267 100644 --- a/hadoop-hdds/common/src/main/resources/ozone-default.xml +++ b/hadoop-hdds/common/src/main/resources/ozone-default.xml @@ -774,17 +774,6 @@ </description> </property> <property> - <name>ozone.scm.max.hb.count.to.process</name> - <value>5000</value> - <tag>OZONE, MANAGEMENT, PERFORMANCE</tag> - <description> - The maximum number of heartbeat to process per loop of the - heartbeat process thread. Please see - ozone.scm.heartbeat.thread.interval - for more info. - </description> - </property> - <property> <name>ozone.scm.names</name> <value/> <tag>OZONE</tag> http://git-wip-us.apache.org/repos/asf/hadoop/blob/71df8c27/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/hdds/scm/HddsServerUtil.java ---------------------------------------------------------------------- diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/hdds/scm/HddsServerUtil.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/hdds/scm/HddsServerUtil.java index c734d9b..cc7adbf 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/hdds/scm/HddsServerUtil.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/hdds/scm/HddsServerUtil.java @@ -259,17 +259,6 @@ public final class HddsServerUtil { } /** - * Returns the maximum number of heartbeat to process per loop of the process - * thread. - * @param conf Configuration - * @return - int -- Number of HBs to process - */ - public static int getMaxHBToProcessPerLoop(Configuration conf) { - return conf.getInt(ScmConfigKeys.OZONE_SCM_MAX_HB_COUNT_TO_PROCESS, - ScmConfigKeys.OZONE_SCM_MAX_HB_COUNT_TO_PROCESS_DEFAULT); - } - - /** * Timeout value for the RPC from Datanode to SCM, primarily used for * Heartbeats and container reports. * http://git-wip-us.apache.org/repos/asf/hadoop/blob/71df8c27/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/protocol/StorageContainerNodeProtocol.java ---------------------------------------------------------------------- diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/protocol/StorageContainerNodeProtocol.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/protocol/StorageContainerNodeProtocol.java index 790f58a..c9ef43f 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/protocol/StorageContainerNodeProtocol.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/protocol/StorageContainerNodeProtocol.java @@ -59,10 +59,8 @@ public interface StorageContainerNodeProtocol { /** * Send heartbeat to indicate the datanode is alive and doing well. * @param datanodeDetails - Datanode ID. - * @param nodeReport - node report. * @return SCMheartbeat response list */ - List<SCMCommand> sendHeartbeat(DatanodeDetails datanodeDetails, - NodeReportProto nodeReport); + List<SCMCommand> processHeartbeat(DatanodeDetails datanodeDetails); } http://git-wip-us.apache.org/repos/asf/hadoop/blob/71df8c27/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/DatanodeInfo.java ---------------------------------------------------------------------- diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/DatanodeInfo.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/DatanodeInfo.java new file mode 100644 index 0000000..51465ee --- /dev/null +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/DatanodeInfo.java @@ -0,0 +1,109 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hdds.scm.node; + +import org.apache.hadoop.hdds.protocol.DatanodeDetails; +import org.apache.hadoop.hdds.protocol.proto + .StorageContainerDatanodeProtocolProtos.StorageReportProto; +import org.apache.hadoop.util.Time; + +import java.util.List; +import java.util.concurrent.locks.ReadWriteLock; +import java.util.concurrent.locks.ReentrantReadWriteLock; + +/** + * This class extends the primary identifier of a Datanode with ephemeral + * state, eg last reported time, usage information etc. + */ +public class DatanodeInfo extends DatanodeDetails { + + private final ReadWriteLock lock; + + private volatile long lastHeartbeatTime; + private long lastStatsUpdatedTime; + + // If required we can dissect StorageReportProto and store the raw data + private List<StorageReportProto> storageReports; + + /** + * Constructs DatanodeInfo from DatanodeDetails. + * + * @param datanodeDetails Details about the datanode + */ + public DatanodeInfo(DatanodeDetails datanodeDetails) { + super(datanodeDetails); + lock = new ReentrantReadWriteLock(); + lastHeartbeatTime = Time.monotonicNow(); + } + + /** + * Updates the last heartbeat time with current time. + */ + public void updateLastHeartbeatTime() { + try { + lock.writeLock().lock(); + lastHeartbeatTime = Time.monotonicNow(); + } finally { + lock.writeLock().unlock(); + } + } + + /** + * Returns the last heartbeat time. + * + * @return last heartbeat time. + */ + public long getLastHeartbeatTime() { + try { + lock.readLock().lock(); + return lastHeartbeatTime; + } finally { + lock.readLock().unlock(); + } + } + + /** + * Updates the datanode storage reports. + * + * @param reports list of storage report + */ + public void updateStorageReports(List<StorageReportProto> reports) { + try { + lock.writeLock().lock(); + lastStatsUpdatedTime = Time.monotonicNow(); + storageReports = reports; + } finally { + lock.writeLock().unlock(); + } + } + + /** + * Returns the storage reports associated with this datanode. + * + * @return list of storage report + */ + public List<StorageReportProto> getStorageReports() { + try { + lock.readLock().lock(); + return storageReports; + } finally { + lock.readLock().unlock(); + } + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/71df8c27/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/HeartbeatQueueItem.java ---------------------------------------------------------------------- diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/HeartbeatQueueItem.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/HeartbeatQueueItem.java deleted file mode 100644 index 04658bd..0000000 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/HeartbeatQueueItem.java +++ /dev/null @@ -1,98 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hadoop.hdds.scm.node; - -import com.google.common.annotations.VisibleForTesting; -import org.apache.hadoop.hdds.protocol.DatanodeDetails; -import org.apache.hadoop.hdds.protocol.proto - .StorageContainerDatanodeProtocolProtos.NodeReportProto; - -import static org.apache.hadoop.util.Time.monotonicNow; - -/** - * This class represents the item in SCM heartbeat queue. - */ -public class HeartbeatQueueItem { - private DatanodeDetails datanodeDetails; - private long recvTimestamp; - private NodeReportProto nodeReport; - - /** - * - * @param datanodeDetails - datanode ID of the heartbeat. - * @param recvTimestamp - heartbeat receive timestamp. - * @param nodeReport - node report associated with the heartbeat if any. - */ - HeartbeatQueueItem(DatanodeDetails datanodeDetails, long recvTimestamp, - NodeReportProto nodeReport) { - this.datanodeDetails = datanodeDetails; - this.recvTimestamp = recvTimestamp; - this.nodeReport = nodeReport; - } - - /** - * @return datanode ID. - */ - public DatanodeDetails getDatanodeDetails() { - return datanodeDetails; - } - - /** - * @return node report. - */ - public NodeReportProto getNodeReport() { - return nodeReport; - } - - /** - * @return heartbeat receive timestamp. - */ - public long getRecvTimestamp() { - return recvTimestamp; - } - - /** - * Builder for HeartbeatQueueItem. - */ - public static class Builder { - private DatanodeDetails datanodeDetails; - private NodeReportProto nodeReport; - private long recvTimestamp = monotonicNow(); - - public Builder setDatanodeDetails(DatanodeDetails dnDetails) { - this.datanodeDetails = dnDetails; - return this; - } - - public Builder setNodeReport(NodeReportProto report) { - this.nodeReport = report; - return this; - } - - @VisibleForTesting - public Builder setRecvTimestamp(long recvTime) { - this.recvTimestamp = recvTime; - return this; - } - - public HeartbeatQueueItem build() { - return new HeartbeatQueueItem(datanodeDetails, recvTimestamp, nodeReport); - } - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hadoop/blob/71df8c27/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/NodeManager.java ---------------------------------------------------------------------- diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/NodeManager.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/NodeManager.java index 72d7e94..c13c37c 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/NodeManager.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/NodeManager.java @@ -17,10 +17,9 @@ */ package org.apache.hadoop.hdds.scm.node; -import com.google.common.annotations.VisibleForTesting; import org.apache.hadoop.hdds.scm.container.placement.metrics.SCMNodeMetric; import org.apache.hadoop.hdds.scm.container.placement.metrics.SCMNodeStat; -import org.apache.hadoop.hdfs.protocol.UnregisteredNodeException; +import org.apache.hadoop.hdds.scm.node.states.NodeNotFoundException; import org.apache.hadoop.hdds.protocol.DatanodeDetails; import org.apache.hadoop.hdds.protocol.proto.HddsProtos.NodeState; import org.apache.hadoop.ozone.protocol.StorageContainerNodeProtocol; @@ -54,14 +53,14 @@ import java.util.UUID; * list, by calling removeNode. We will throw away this nodes info soon. */ public interface NodeManager extends StorageContainerNodeProtocol, - NodeManagerMXBean, Closeable, Runnable { + NodeManagerMXBean, Closeable { /** * Removes a data node from the management of this Node Manager. * * @param node - DataNode. - * @throws UnregisteredNodeException + * @throws NodeNotFoundException */ - void removeNode(DatanodeDetails node) throws UnregisteredNodeException; + void removeNode(DatanodeDetails node) throws NodeNotFoundException; /** * Gets all Live Datanodes that is currently communicating with SCM. @@ -124,13 +123,6 @@ public interface NodeManager extends StorageContainerNodeProtocol, SCMNodeMetric getNodeStat(DatanodeDetails datanodeDetails); /** - * Wait for the heartbeat is processed by NodeManager. - * @return true if heartbeat has been processed. - */ - @VisibleForTesting - boolean waitForHeartbeatProcessed(); - - /** * Returns the node state of a specific node. * @param datanodeDetails DatanodeDetails * @return Healthy/Stale/Dead. http://git-wip-us.apache.org/repos/asf/hadoop/blob/71df8c27/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/NodeStateManager.java ---------------------------------------------------------------------- diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/NodeStateManager.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/NodeStateManager.java new file mode 100644 index 0000000..5543c04 --- /dev/null +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/NodeStateManager.java @@ -0,0 +1,575 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hdds.scm.node; + +import com.google.common.base.Preconditions; +import com.google.common.util.concurrent.ThreadFactoryBuilder; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hdds.protocol.DatanodeDetails; +import org.apache.hadoop.hdds.protocol.proto.HddsProtos.NodeState; +import org.apache.hadoop.hdds.scm.HddsServerUtil; +import org.apache.hadoop.hdds.scm.node.states.NodeAlreadyExistsException; +import org.apache.hadoop.hdds.scm.node.states.NodeNotFoundException; +import org.apache.hadoop.hdds.scm.node.states.NodeStateMap; +import org.apache.hadoop.ozone.common.statemachine + .InvalidStateTransitionException; +import org.apache.hadoop.ozone.common.statemachine.StateMachine; +import org.apache.hadoop.util.Time; +import org.apache.hadoop.util.concurrent.HadoopExecutors; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.Closeable; +import java.util.HashSet; +import java.util.LinkedList; +import java.util.List; +import java.util.Set; +import java.util.UUID; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; +import java.util.function.Predicate; + +import static org.apache.hadoop.hdds.scm.ScmConfigKeys + .OZONE_SCM_DEADNODE_INTERVAL; +import static org.apache.hadoop.hdds.scm.ScmConfigKeys + .OZONE_SCM_HEARTBEAT_PROCESS_INTERVAL; +import static org.apache.hadoop.hdds.scm.ScmConfigKeys + .OZONE_SCM_STALENODE_INTERVAL; + +/** + * NodeStateManager maintains the state of all the datanodes in the cluster. All + * the node state change should happen only via NodeStateManager. It also + * runs a heartbeat thread which periodically updates the node state. + * <p> + * The getNode(byState) functions make copy of node maps and then creates a list + * based on that. It should be assumed that these get functions always report + * *stale* information. For example, getting the deadNodeCount followed by + * getNodes(DEAD) could very well produce totally different count. Also + * getNodeCount(HEALTHY) + getNodeCount(DEAD) + getNodeCode(STALE), is not + * guaranteed to add up to the total nodes that we know off. Please treat all + * get functions in this file as a snap-shot of information that is inconsistent + * as soon as you read it. + */ +public class NodeStateManager implements Runnable, Closeable { + + /** + * Node's life cycle events. + */ + private enum NodeLifeCycleEvent { + TIMEOUT, RESTORE, RESURRECT, DECOMMISSION, DECOMMISSIONED + } + + private static final Logger LOG = LoggerFactory + .getLogger(NodeStateManager.class); + + /** + * StateMachine for node lifecycle. + */ + private final StateMachine<NodeState, NodeLifeCycleEvent> stateMachine; + /** + * This is the map which maintains the current state of all datanodes. + */ + private final NodeStateMap nodeStateMap; + /** + * ExecutorService used for scheduling heartbeat processing thread. + */ + private final ScheduledExecutorService executorService; + /** + * The frequency in which we have run the heartbeat processing thread. + */ + private final long heartbeatCheckerIntervalMs; + /** + * The timeout value which will be used for marking a datanode as stale. + */ + private final long staleNodeIntervalMs; + /** + * The timeout value which will be used for marking a datanode as dead. + */ + private final long deadNodeIntervalMs; + + /** + * Constructs a NodeStateManager instance with the given configuration. + * + * @param conf Configuration + */ + public NodeStateManager(Configuration conf) { + nodeStateMap = new NodeStateMap(); + Set<NodeState> finalStates = new HashSet<>(); + finalStates.add(NodeState.DECOMMISSIONED); + this.stateMachine = new StateMachine<>(NodeState.HEALTHY, finalStates); + initializeStateMachine(); + heartbeatCheckerIntervalMs = HddsServerUtil + .getScmheartbeatCheckerInterval(conf); + staleNodeIntervalMs = HddsServerUtil.getStaleNodeInterval(conf); + deadNodeIntervalMs = HddsServerUtil.getDeadNodeInterval(conf); + Preconditions.checkState(heartbeatCheckerIntervalMs > 0, + OZONE_SCM_HEARTBEAT_PROCESS_INTERVAL + " should be greater than 0."); + Preconditions.checkState(staleNodeIntervalMs < deadNodeIntervalMs, + OZONE_SCM_STALENODE_INTERVAL + " should be less than" + + OZONE_SCM_DEADNODE_INTERVAL); + executorService = HadoopExecutors.newScheduledThreadPool(1, + new ThreadFactoryBuilder().setDaemon(true) + .setNameFormat("SCM Heartbeat Processing Thread - %d").build()); + executorService.schedule(this, heartbeatCheckerIntervalMs, + TimeUnit.MILLISECONDS); + } + + /* + * + * Node and State Transition Mapping: + * + * State: HEALTHY -------------------> STALE + * Event: TIMEOUT + * + * State: STALE -------------------> DEAD + * Event: TIMEOUT + * + * State: STALE -------------------> HEALTHY + * Event: RESTORE + * + * State: DEAD -------------------> HEALTHY + * Event: RESURRECT + * + * State: HEALTHY -------------------> DECOMMISSIONING + * Event: DECOMMISSION + * + * State: STALE -------------------> DECOMMISSIONING + * Event: DECOMMISSION + * + * State: DEAD -------------------> DECOMMISSIONING + * Event: DECOMMISSION + * + * State: DECOMMISSIONING -------------------> DECOMMISSIONED + * Event: DECOMMISSIONED + * + * Node State Flow + * + * +--------------------------------------------------------+ + * | (RESURRECT) | + * | +--------------------------+ | + * | | (RESTORE) | | + * | | | | + * V V | | + * [HEALTHY]------------------->[STALE]------------------->[DEAD] + * | (TIMEOUT) | (TIMEOUT) | + * | | | + * | | | + * | | | + * | | | + * | (DECOMMISSION) | (DECOMMISSION) | (DECOMMISSION) + * | V | + * +------------------->[DECOMMISSIONING]<----------------+ + * | + * | (DECOMMISSIONED) + * | + * V + * [DECOMMISSIONED] + * + */ + + /** + * Initializes the lifecycle of node state machine. + */ + private void initializeStateMachine() { + stateMachine.addTransition( + NodeState.HEALTHY, NodeState.STALE, NodeLifeCycleEvent.TIMEOUT); + stateMachine.addTransition( + NodeState.STALE, NodeState.DEAD, NodeLifeCycleEvent.TIMEOUT); + stateMachine.addTransition( + NodeState.STALE, NodeState.HEALTHY, NodeLifeCycleEvent.RESTORE); + stateMachine.addTransition( + NodeState.DEAD, NodeState.HEALTHY, NodeLifeCycleEvent.RESURRECT); + stateMachine.addTransition( + NodeState.HEALTHY, NodeState.DECOMMISSIONING, + NodeLifeCycleEvent.DECOMMISSION); + stateMachine.addTransition( + NodeState.STALE, NodeState.DECOMMISSIONING, + NodeLifeCycleEvent.DECOMMISSION); + stateMachine.addTransition( + NodeState.DEAD, NodeState.DECOMMISSIONING, + NodeLifeCycleEvent.DECOMMISSION); + stateMachine.addTransition( + NodeState.DECOMMISSIONING, NodeState.DECOMMISSIONED, + NodeLifeCycleEvent.DECOMMISSIONED); + + } + + /** + * Adds a new node to the state manager. + * + * @param datanodeDetails DatanodeDetails + * + * @throws NodeAlreadyExistsException if the node is already present + */ + public void addNode(DatanodeDetails datanodeDetails) + throws NodeAlreadyExistsException { + nodeStateMap.addNode(datanodeDetails, stateMachine.getInitialState()); + } + + /** + * Get information about the node. + * + * @param datanodeDetails DatanodeDetails + * + * @return DatanodeInfo + * + * @throws NodeNotFoundException if the node is not present + */ + public DatanodeInfo getNode(DatanodeDetails datanodeDetails) + throws NodeNotFoundException { + return nodeStateMap.getNodeInfo(datanodeDetails.getUuid()); + } + + /** + * Updates the last heartbeat time of the node. + * + * @throws NodeNotFoundException if the node is not present + */ + public void updateLastHeartbeatTime(DatanodeDetails datanodeDetails) + throws NodeNotFoundException { + nodeStateMap.getNodeInfo(datanodeDetails.getUuid()) + .updateLastHeartbeatTime(); + } + + /** + * Returns the current state of the node. + * + * @param datanodeDetails DatanodeDetails + * + * @return NodeState + * + * @throws NodeNotFoundException if the node is not present + */ + public NodeState getNodeState(DatanodeDetails datanodeDetails) + throws NodeNotFoundException { + return nodeStateMap.getNodeState(datanodeDetails.getUuid()); + } + + /** + * Returns all the node which are in healthy state. + * + * @return list of healthy nodes + */ + public List<DatanodeDetails> getHealthyNodes() { + return getNodes(NodeState.HEALTHY); + } + + /** + * Returns all the node which are in stale state. + * + * @return list of stale nodes + */ + public List<DatanodeDetails> getStaleNodes() { + return getNodes(NodeState.STALE); + } + + /** + * Returns all the node which are in dead state. + * + * @return list of dead nodes + */ + public List<DatanodeDetails> getDeadNodes() { + return getNodes(NodeState.DEAD); + } + + /** + * Returns all the node which are in the specified state. + * + * @param state NodeState + * + * @return list of nodes + */ + public List<DatanodeDetails> getNodes(NodeState state) { + List<DatanodeDetails> nodes = new LinkedList<>(); + nodeStateMap.getNodes(state).forEach( + uuid -> { + try { + nodes.add(nodeStateMap.getNodeDetails(uuid)); + } catch (NodeNotFoundException e) { + // This should not happen unless someone else other than + // NodeStateManager is directly modifying NodeStateMap and removed + // the node entry after we got the list of UUIDs. + LOG.error("Inconsistent NodeStateMap! " + nodeStateMap); + } + }); + return nodes; + } + + /** + * Returns all the nodes which have registered to NodeStateManager. + * + * @return all the managed nodes + */ + public List<DatanodeDetails> getAllNodes() { + List<DatanodeDetails> nodes = new LinkedList<>(); + nodeStateMap.getAllNodes().forEach( + uuid -> { + try { + nodes.add(nodeStateMap.getNodeDetails(uuid)); + } catch (NodeNotFoundException e) { + // This should not happen unless someone else other than + // NodeStateManager is directly modifying NodeStateMap and removed + // the node entry after we got the list of UUIDs. + LOG.error("Inconsistent NodeStateMap! " + nodeStateMap); + } + }); + return nodes; + } + + /** + * Returns the count of healthy nodes. + * + * @return healthy node count + */ + public int getHealthyNodeCount() { + return getNodeCount(NodeState.HEALTHY); + } + + /** + * Returns the count of stale nodes. + * + * @return stale node count + */ + public int getStaleNodeCount() { + return getNodeCount(NodeState.STALE); + } + + /** + * Returns the count of dead nodes. + * + * @return dead node count + */ + public int getDeadNodeCount() { + return getNodeCount(NodeState.DEAD); + } + + /** + * Returns the count of nodes in specified state. + * + * @param state NodeState + * + * @return node count + */ + public int getNodeCount(NodeState state) { + return nodeStateMap.getNodeCount(state); + } + + /** + * Returns the count of all nodes managed by NodeStateManager. + * + * @return node count + */ + public int getTotalNodeCount() { + return nodeStateMap.getTotalNodeCount(); + } + + /** + * Removes a node from NodeStateManager. + * + * @param datanodeDetails DatanodeDetails + * + * @throws NodeNotFoundException if the node is not present + */ + public void removeNode(DatanodeDetails datanodeDetails) + throws NodeNotFoundException { + nodeStateMap.removeNode(datanodeDetails.getUuid()); + } + + /** + * Move Stale or Dead node to healthy if we got a heartbeat from them. + * Move healthy nodes to stale nodes if it is needed. + * Move Stales node to dead if needed. + * + * @see Thread#run() + */ + @Override + public void run() { + + /* + * + * staleNodeDeadline healthyNodeDeadline + * | | + * Dead | Stale | Healthy + * Node | Node | Node + * Window | Window | Window + * ----------------+----------------------------------+-------------------> + * >>-->> time-line >>-->> + * + * Here is the logic of computing the health of a node. +âââââ* +âââââ*â1. We get the current time and look back that the time +âââââ*â when we got a heartbeat from a node. +âââââ*â +âââââ*â2. If the last heartbeat was within the window of healthy node we mark +âââââ*â it as healthy. +âââââ*â +âââââ*â3. If the last HB Time stamp is longer and falls within the window of +âââââ*â Stale Node time, we will mark it as Stale. +âââââ*â +âââââ*â4. If the last HB time is older than the Stale Window, then the node is +âââââ* marked as dead. + * + * The Processing starts from current time and looks backwards in time. + */ + long processingStartTime = Time.monotonicNow(); + // After this time node is considered to be stale. + long healthyNodeDeadline = processingStartTime - staleNodeIntervalMs; + // After this time node is considered to be dead. + long staleNodeDeadline = processingStartTime - deadNodeIntervalMs; + + Predicate<Long> healthyNodeCondition = + (lastHbTime) -> lastHbTime >= healthyNodeDeadline; + // staleNodeCondition is superset of stale and dead node + Predicate<Long> staleNodeCondition = + (lastHbTime) -> lastHbTime < healthyNodeDeadline; + Predicate<Long> deadNodeCondition = + (lastHbTime) -> lastHbTime < staleNodeDeadline; + try { + for (NodeState state : NodeState.values()) { + List<UUID> nodes = nodeStateMap.getNodes(state); + for (UUID id : nodes) { + DatanodeInfo node = nodeStateMap.getNodeInfo(id); + switch (state) { + case HEALTHY: + // Move the node to STALE if the last heartbeat time is less than + // configured stale-node interval. + updateNodeState(node, staleNodeCondition, state, + NodeLifeCycleEvent.TIMEOUT); + break; + case STALE: + // Move the node to DEAD if the last heartbeat time is less than + // configured dead-node interval. + updateNodeState(node, deadNodeCondition, state, + NodeLifeCycleEvent.TIMEOUT); + // Restore the node if we have received heartbeat before configured + // stale-node interval. + updateNodeState(node, healthyNodeCondition, state, + NodeLifeCycleEvent.RESTORE); + break; + case DEAD: + // Resurrect the node if we have received heartbeat before + // configured stale-node interval. + updateNodeState(node, healthyNodeCondition, state, + NodeLifeCycleEvent.RESURRECT); + break; + // We don't do anything for DECOMMISSIONING and DECOMMISSIONED in + // heartbeat processing. + case DECOMMISSIONING: + case DECOMMISSIONED: + default: + } + } + } + } catch (NodeNotFoundException e) { + // This should not happen unless someone else other than + // NodeStateManager is directly modifying NodeStateMap and removed + // the node entry after we got the list of UUIDs. + LOG.error("Inconsistent NodeStateMap! " + nodeStateMap); + } + long processingEndTime = Time.monotonicNow(); + //If we have taken too much time for HB processing, log that information. + if ((processingEndTime - processingStartTime) > + heartbeatCheckerIntervalMs) { + LOG.error("Total time spend processing datanode HB's is greater than " + + "configured values for datanode heartbeats. Please adjust the" + + " heartbeat configs. Time Spend on HB processing: {} seconds " + + "Datanode heartbeat Interval: {} seconds.", + TimeUnit.MILLISECONDS + .toSeconds(processingEndTime - processingStartTime), + heartbeatCheckerIntervalMs); + } + + // we purposefully make this non-deterministic. Instead of using a + // scheduleAtFixedFrequency we will just go to sleep + // and wake up at the next rendezvous point, which is currentTime + + // heartbeatCheckerIntervalMs. This leads to the issue that we are now + // heart beating not at a fixed cadence, but clock tick + time taken to + // work. + // + // This time taken to work can skew the heartbeat processor thread. + // The reason why we don't care is because of the following reasons. + // + // 1. checkerInterval is general many magnitudes faster than datanode HB + // frequency. + // + // 2. if we have too much nodes, the SCM would be doing only HB + // processing, this could lead to SCM's CPU starvation. With this + // approach we always guarantee that HB thread sleeps for a little while. + // + // 3. It is possible that we will never finish processing the HB's in the + // thread. But that means we have a mis-configured system. We will warn + // the users by logging that information. + // + // 4. And the most important reason, heartbeats are not blocked even if + // this thread does not run, they will go into the processing queue. + + if (!Thread.currentThread().isInterrupted() && + !executorService.isShutdown()) { + executorService.schedule(this, heartbeatCheckerIntervalMs, + TimeUnit.MILLISECONDS); + } else { + LOG.info("Current Thread is interrupted, shutting down HB processing " + + "thread for Node Manager."); + } + + } + + /** + * Updates the node state if the condition satisfies. + * + * @param node DatanodeInfo + * @param condition condition to check + * @param state current state of node + * @param lifeCycleEvent NodeLifeCycleEvent to be applied if condition + * matches + * + * @throws NodeNotFoundException if the node is not present + */ + private void updateNodeState(DatanodeInfo node, Predicate<Long> condition, + NodeState state, NodeLifeCycleEvent lifeCycleEvent) + throws NodeNotFoundException { + try { + if (condition.test(node.getLastHeartbeatTime())) { + NodeState newState = stateMachine.getNextState(state, lifeCycleEvent); + nodeStateMap.updateNodeState(node.getUuid(), state, newState); + } + } catch (InvalidStateTransitionException e) { + LOG.warn("Invalid state transition of node {}." + + " Current state: {}, life cycle event: {}", + node, state, lifeCycleEvent); + } + } + + @Override + public void close() { + executorService.shutdown(); + try { + if (!executorService.awaitTermination(5, TimeUnit.SECONDS)) { + executorService.shutdownNow(); + } + + if (!executorService.awaitTermination(5, TimeUnit.SECONDS)) { + LOG.error("Unable to shutdown NodeStateManager properly."); + } + } catch (InterruptedException e) { + executorService.shutdownNow(); + Thread.currentThread().interrupt(); + } + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/71df8c27/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/SCMNodeManager.java ---------------------------------------------------------------------- diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/SCMNodeManager.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/SCMNodeManager.java index adca8ea..15ac3f2 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/SCMNodeManager.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/SCMNodeManager.java @@ -19,8 +19,8 @@ package org.apache.hadoop.hdds.scm.node; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; -import com.google.common.util.concurrent.ThreadFactoryBuilder; -import org.apache.hadoop.hdds.scm.HddsServerUtil; +import org.apache.hadoop.hdds.scm.node.states.NodeAlreadyExistsException; +import org.apache.hadoop.hdds.scm.node.states.NodeNotFoundException; import org.apache.hadoop.hdds.scm.server.StorageContainerManager; import org.apache.hadoop.hdds.scm.VersionInfo; import org.apache.hadoop.hdds.scm.container.placement.metrics.SCMNodeMetric; @@ -29,7 +29,6 @@ import org.apache.hadoop.hdds.server.events.Event; import org.apache.hadoop.hdds.server.events.EventHandler; import org.apache.hadoop.hdds.server.events.EventPublisher; import org.apache.hadoop.hdds.server.events.TypedEvent; -import org.apache.hadoop.hdfs.protocol.UnregisteredNodeException; import org.apache.hadoop.hdds.conf.OzoneConfiguration; import org.apache.hadoop.hdds.protocol.DatanodeDetails; import org.apache.hadoop.hdds.protocol.proto.HddsProtos.NodeState; @@ -50,8 +49,6 @@ import org.apache.hadoop.ozone.protocol.commands.CommandForDatanode; import org.apache.hadoop.ozone.protocol.commands.RegisteredCommand; import org.apache.hadoop.ozone.protocol.commands.ReregisterCommand; import org.apache.hadoop.ozone.protocol.commands.SCMCommand; -import org.apache.hadoop.util.Time; -import org.apache.hadoop.util.concurrent.HadoopExecutors; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -63,39 +60,15 @@ import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; -import java.util.Queue; import java.util.UUID; import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ConcurrentLinkedQueue; -import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.atomic.AtomicInteger; -import java.util.stream.Collectors; - -import static org.apache.hadoop.hdds.protocol.proto.HddsProtos.NodeState.DEAD; -import static org.apache.hadoop.hdds.protocol.proto.HddsProtos.NodeState - .HEALTHY; -import static org.apache.hadoop.hdds.protocol.proto.HddsProtos.NodeState - .INVALID; -import static org.apache.hadoop.hdds.protocol.proto.HddsProtos.NodeState.STALE; -import static org.apache.hadoop.util.Time.monotonicNow; /** * Maintains information about the Datanodes on SCM side. * <p> * Heartbeats under SCM is very simple compared to HDFS heartbeatManager. * <p> - * Here we maintain 3 maps, and we propagate a node from healthyNodesMap to - * staleNodesMap to deadNodesMap. This moving of a node from one map to another - * is controlled by 4 configuration variables. These variables define how many - * heartbeats must go missing for the node to move from one map to another. - * <p> - * Each heartbeat that SCMNodeManager receives is put into heartbeatQueue. The - * worker thread wakes up and grabs that heartbeat from the queue. The worker - * thread will lookup the healthynodes map and set the timestamp if the entry - * is there. if not it will look up stale and deadnodes map. - * <p> * The getNode(byState) functions make copy of node maps and then creates a list * based on that. It should be assumed that these get functions always report * *stale* information. For example, getting the deadNodeCount followed by @@ -113,33 +86,18 @@ public class SCMNodeManager static final Logger LOG = LoggerFactory.getLogger(SCMNodeManager.class); - /** - * Key = NodeID, value = timestamp. - */ - private final ConcurrentHashMap<UUID, Long> healthyNodes; - private final ConcurrentHashMap<UUID, Long> staleNodes; - private final ConcurrentHashMap<UUID, Long> deadNodes; - private final Queue<HeartbeatQueueItem> heartbeatQueue; - private final ConcurrentHashMap<UUID, DatanodeDetails> nodes; + + private final NodeStateManager nodeStateManager; // Individual live node stats + // TODO: NodeStat should be moved to NodeStatemanager (NodeStateMap) private final ConcurrentHashMap<UUID, SCMNodeStat> nodeStats; + // Should we maintain aggregated stats? If this is not frequently used, we + // can always calculate it from nodeStats whenever required. // Aggregated node stats private SCMNodeStat scmStat; - // TODO: expose nodeStats and scmStat as metrics - private final AtomicInteger healthyNodeCount; - private final AtomicInteger staleNodeCount; - private final AtomicInteger deadNodeCount; - private final AtomicInteger totalNodes; - private long staleNodeIntervalMs; - private final long deadNodeIntervalMs; - private final long heartbeatCheckerIntervalMs; - private final long datanodeHBIntervalSeconds; - private final ScheduledExecutorService executorService; - private long lastHBcheckStart; - private long lastHBcheckFinished = 0; - private long lastHBProcessedCount; + // Should we create ChillModeManager and extract all the chill mode logic + // to a new class? private int chillModeNodeCount; - private final int maxHBToProcessPerLoop; private final String clusterID; private final VersionInfo version; /** @@ -168,47 +126,19 @@ public class SCMNodeManager */ public SCMNodeManager(OzoneConfiguration conf, String clusterID, StorageContainerManager scmManager) throws IOException { - heartbeatQueue = new ConcurrentLinkedQueue<>(); - healthyNodes = new ConcurrentHashMap<>(); - deadNodes = new ConcurrentHashMap<>(); - staleNodes = new ConcurrentHashMap<>(); - nodes = new ConcurrentHashMap<>(); - nodeStats = new ConcurrentHashMap<>(); - scmStat = new SCMNodeStat(); - - healthyNodeCount = new AtomicInteger(0); - staleNodeCount = new AtomicInteger(0); - deadNodeCount = new AtomicInteger(0); - totalNodes = new AtomicInteger(0); + this.nodeStateManager = new NodeStateManager(conf); + this.nodeStats = new ConcurrentHashMap<>(); + this.scmStat = new SCMNodeStat(); this.clusterID = clusterID; this.version = VersionInfo.getLatestVersion(); - commandQueue = new CommandQueue(); - + this.commandQueue = new CommandQueue(); // TODO: Support this value as a Percentage of known machines. - chillModeNodeCount = 1; - - staleNodeIntervalMs = HddsServerUtil.getStaleNodeInterval(conf); - deadNodeIntervalMs = HddsServerUtil.getDeadNodeInterval(conf); - heartbeatCheckerIntervalMs = - HddsServerUtil.getScmheartbeatCheckerInterval(conf); - datanodeHBIntervalSeconds = HddsServerUtil.getScmHeartbeatInterval(conf); - maxHBToProcessPerLoop = HddsServerUtil.getMaxHBToProcessPerLoop(conf); - - executorService = HadoopExecutors.newScheduledThreadPool(1, - new ThreadFactoryBuilder().setDaemon(true) - .setNameFormat("SCM Heartbeat Processing Thread - %d").build()); - - LOG.info("Entering startup chill mode."); + this.chillModeNodeCount = 1; this.inStartupChillMode = new AtomicBoolean(true); this.inManualChillMode = new AtomicBoolean(false); - - Preconditions.checkState(heartbeatCheckerIntervalMs > 0); - executorService.schedule(this, heartbeatCheckerIntervalMs, - TimeUnit.MILLISECONDS); - - registerMXBean(); - this.scmManager = scmManager; + LOG.info("Entering startup chill mode."); + registerMXBean(); } private void registerMXBean() { @@ -227,12 +157,11 @@ public class SCMNodeManager * Removes a data node from the management of this Node Manager. * * @param node - DataNode. - * @throws UnregisteredNodeException + * @throws NodeNotFoundException */ @Override - public void removeNode(DatanodeDetails node) { - // TODO : Fix me when adding the SCM CLI. - + public void removeNode(DatanodeDetails node) throws NodeNotFoundException { + nodeStateManager.removeNode(node); } /** @@ -244,31 +173,8 @@ public class SCMNodeManager * @return List of Datanodes that are known to SCM in the requested state. */ @Override - public List<DatanodeDetails> getNodes(NodeState nodestate) - throws IllegalArgumentException { - Map<UUID, Long> set; - switch (nodestate) { - case HEALTHY: - synchronized (this) { - set = Collections.unmodifiableMap(new HashMap<>(healthyNodes)); - } - break; - case STALE: - synchronized (this) { - set = Collections.unmodifiableMap(new HashMap<>(staleNodes)); - } - break; - case DEAD: - synchronized (this) { - set = Collections.unmodifiableMap(new HashMap<>(deadNodes)); - } - break; - default: - throw new IllegalArgumentException("Unknown node state requested."); - } - - return set.entrySet().stream().map(entry -> nodes.get(entry.getKey())) - .collect(Collectors.toList()); + public List<DatanodeDetails> getNodes(NodeState nodestate) { + return nodeStateManager.getNodes(nodestate); } /** @@ -278,12 +184,7 @@ public class SCMNodeManager */ @Override public List<DatanodeDetails> getAllNodes() { - Map<UUID, DatanodeDetails> set; - synchronized (this) { - set = Collections.unmodifiableMap(new HashMap<>(nodes)); - } - return set.entrySet().stream().map(entry -> nodes.get(entry.getKey())) - .collect(Collectors.toList()); + return nodeStateManager.getAllNodes(); } /** @@ -315,14 +216,16 @@ public class SCMNodeManager if (inStartupChillMode.get()) { return "Still in chill mode, waiting on nodes to report in." + String.format(" %d nodes reported, minimal %d nodes required.", - totalNodes.get(), getMinimumChillModeNodes()); + nodeStateManager.getTotalNodeCount(), getMinimumChillModeNodes()); } if (inManualChillMode.get()) { return "Out of startup chill mode, but in manual chill mode." + - String.format(" %d nodes have reported in.", totalNodes.get()); + String.format(" %d nodes have reported in.", + nodeStateManager.getTotalNodeCount()); } return "Out of chill mode." + - String.format(" %d nodes have reported in.", totalNodes.get()); + String.format(" %d nodes have reported in.", + nodeStateManager.getTotalNodeCount()); } /** @@ -376,33 +279,7 @@ public class SCMNodeManager */ @Override public int getNodeCount(NodeState nodestate) { - switch (nodestate) { - case HEALTHY: - return healthyNodeCount.get(); - case STALE: - return staleNodeCount.get(); - case DEAD: - return deadNodeCount.get(); - case INVALID: - // This is unknown due to the fact that some nodes can be in - // transit between the other states. Returning a count for that is not - // possible. The fact that we have such state is to deal with the fact - // that this information might not be consistent always. - return 0; - default: - return 0; - } - } - - /** - * Used for testing. - * - * @return true if the HB check is done. - */ - @VisibleForTesting - @Override - public boolean waitForHeartbeatProcessed() { - return lastHBcheckFinished != 0; + return nodeStateManager.getNodeCount(nodestate); } /** @@ -413,236 +290,14 @@ public class SCMNodeManager */ @Override public NodeState getNodeState(DatanodeDetails datanodeDetails) { - // There is a subtle race condition here, hence we also support - // the NODEState.UNKNOWN. It is possible that just before we check the - // healthyNodes, we have removed the node from the healthy list but stil - // not added it to Stale Nodes list. - // We can fix that by adding the node to stale list before we remove, but - // then the node is in 2 states to avoid this race condition. Instead we - // just deal with the possibilty of getting a state called unknown. - - UUID id = datanodeDetails.getUuid(); - if(healthyNodes.containsKey(id)) { - return HEALTHY; - } - - if(staleNodes.containsKey(id)) { - return STALE; - } - - if(deadNodes.containsKey(id)) { - return DEAD; - } - - return INVALID; - } - - /** - * This is the real worker thread that processes the HB queue. We do the - * following things in this thread. - * <p> - * Process the Heartbeats that are in the HB Queue. Move Stale or Dead node to - * healthy if we got a heartbeat from them. Move Stales Node to dead node - * table if it is needed. Move healthy nodes to stale nodes if it is needed. - * <p> - * if it is a new node, we call register node and add it to the list of nodes. - * This will be replaced when we support registration of a node in SCM. - * - * @see Thread#run() - */ - @Override - public void run() { - lastHBcheckStart = monotonicNow(); - lastHBProcessedCount = 0; - - // Process the whole queue. - while (!heartbeatQueue.isEmpty() && - (lastHBProcessedCount < maxHBToProcessPerLoop)) { - HeartbeatQueueItem hbItem = heartbeatQueue.poll(); - synchronized (this) { - handleHeartbeat(hbItem); - } - // we are shutting down or something give up processing the rest of - // HBs. This will terminate the HB processing thread. - if (Thread.currentThread().isInterrupted()) { - LOG.info("Current Thread is isInterrupted, shutting down HB " + - "processing thread for Node Manager."); - return; - } - } - - if (lastHBProcessedCount >= maxHBToProcessPerLoop) { - LOG.error("SCM is being flooded by heartbeats. Not able to keep up with" + - " the heartbeat counts. Processed {} heartbeats. Breaking out of" + - " loop. Leaving rest to be processed later. ", lastHBProcessedCount); - } - - // Iterate over the Stale nodes and decide if we need to move any node to - // dead State. - long currentTime = monotonicNow(); - for (Map.Entry<UUID, Long> entry : staleNodes.entrySet()) { - if (currentTime - entry.getValue() > deadNodeIntervalMs) { - synchronized (this) { - moveStaleNodeToDead(entry); - } - } - } - - // Iterate over the healthy nodes and decide if we need to move any node to - // Stale State. - currentTime = monotonicNow(); - for (Map.Entry<UUID, Long> entry : healthyNodes.entrySet()) { - if (currentTime - entry.getValue() > staleNodeIntervalMs) { - synchronized (this) { - moveHealthyNodeToStale(entry); - } - } - } - lastHBcheckFinished = monotonicNow(); - - monitorHBProcessingTime(); - - // we purposefully make this non-deterministic. Instead of using a - // scheduleAtFixedFrequency we will just go to sleep - // and wake up at the next rendezvous point, which is currentTime + - // heartbeatCheckerIntervalMs. This leads to the issue that we are now - // heart beating not at a fixed cadence, but clock tick + time taken to - // work. - // - // This time taken to work can skew the heartbeat processor thread. - // The reason why we don't care is because of the following reasons. - // - // 1. checkerInterval is general many magnitudes faster than datanode HB - // frequency. - // - // 2. if we have too much nodes, the SCM would be doing only HB - // processing, this could lead to SCM's CPU starvation. With this - // approach we always guarantee that HB thread sleeps for a little while. - // - // 3. It is possible that we will never finish processing the HB's in the - // thread. But that means we have a mis-configured system. We will warn - // the users by logging that information. - // - // 4. And the most important reason, heartbeats are not blocked even if - // this thread does not run, they will go into the processing queue. - - if (!Thread.currentThread().isInterrupted() && - !executorService.isShutdown()) { - executorService.schedule(this, heartbeatCheckerIntervalMs, TimeUnit - .MILLISECONDS); - } else { - LOG.info("Current Thread is interrupted, shutting down HB processing " + - "thread for Node Manager."); - } - } - - /** - * If we have taken too much time for HB processing, log that information. - */ - private void monitorHBProcessingTime() { - if (TimeUnit.MILLISECONDS.toSeconds(lastHBcheckFinished - - lastHBcheckStart) > datanodeHBIntervalSeconds) { - LOG.error("Total time spend processing datanode HB's is greater than " + - "configured values for datanode heartbeats. Please adjust the" + - " heartbeat configs. Time Spend on HB processing: {} seconds " + - "Datanode heartbeat Interval: {} seconds , heartbeats " + - "processed: {}", - TimeUnit.MILLISECONDS - .toSeconds(lastHBcheckFinished - lastHBcheckStart), - datanodeHBIntervalSeconds, lastHBProcessedCount); - } - } - - /** - * Moves a Healthy node to a Stale node state. - * - * @param entry - Map Entry - */ - private void moveHealthyNodeToStale(Map.Entry<UUID, Long> entry) { - LOG.trace("Moving healthy node to stale: {}", entry.getKey()); - healthyNodes.remove(entry.getKey()); - healthyNodeCount.decrementAndGet(); - staleNodes.put(entry.getKey(), entry.getValue()); - staleNodeCount.incrementAndGet(); - - if (scmManager != null) { - // remove stale node's container report - scmManager.removeContainerReport(entry.getKey().toString()); + try { + return nodeStateManager.getNodeState(datanodeDetails); + } catch (NodeNotFoundException e) { + // TODO: should we throw NodeNotFoundException? + return null; } } - /** - * Moves a Stale node to a dead node state. - * - * @param entry - Map Entry - */ - private void moveStaleNodeToDead(Map.Entry<UUID, Long> entry) { - LOG.trace("Moving stale node to dead: {}", entry.getKey()); - staleNodes.remove(entry.getKey()); - staleNodeCount.decrementAndGet(); - deadNodes.put(entry.getKey(), entry.getValue()); - deadNodeCount.incrementAndGet(); - - // Update SCM node stats - SCMNodeStat deadNodeStat = nodeStats.get(entry.getKey()); - scmStat.subtract(deadNodeStat); - nodeStats.remove(entry.getKey()); - } - - /** - * Handles a single heartbeat from a datanode. - * - * @param hbItem - heartbeat item from a datanode. - */ - private void handleHeartbeat(HeartbeatQueueItem hbItem) { - lastHBProcessedCount++; - - DatanodeDetails datanodeDetails = hbItem.getDatanodeDetails(); - UUID datanodeUuid = datanodeDetails.getUuid(); - NodeReportProto nodeReport = hbItem.getNodeReport(); - long recvTimestamp = hbItem.getRecvTimestamp(); - long processTimestamp = Time.monotonicNow(); - if (LOG.isTraceEnabled()) { - //TODO: add average queue time of heartbeat request as metrics - LOG.trace("Processing Heartbeat from datanode {}: queueing time {}", - datanodeUuid, processTimestamp - recvTimestamp); - } - - // If this node is already in the list of known and healthy nodes - // just set the last timestamp and return. - if (healthyNodes.containsKey(datanodeUuid)) { - healthyNodes.put(datanodeUuid, processTimestamp); - updateNodeStat(datanodeUuid, nodeReport); - return; - } - - // A stale node has heartbeat us we need to remove the node from stale - // list and move to healthy list. - if (staleNodes.containsKey(datanodeUuid)) { - staleNodes.remove(datanodeUuid); - healthyNodes.put(datanodeUuid, processTimestamp); - healthyNodeCount.incrementAndGet(); - staleNodeCount.decrementAndGet(); - updateNodeStat(datanodeUuid, nodeReport); - return; - } - - // A dead node has heartbeat us, we need to remove that node from dead - // node list and move it to the healthy list. - if (deadNodes.containsKey(datanodeUuid)) { - deadNodes.remove(datanodeUuid); - healthyNodes.put(datanodeUuid, processTimestamp); - deadNodeCount.decrementAndGet(); - healthyNodeCount.incrementAndGet(); - updateNodeStat(datanodeUuid, nodeReport); - return; - } - - LOG.warn("SCM receive heartbeat from unregistered datanode {}", - datanodeUuid); - this.commandQueue.addCommand(datanodeUuid, - new ReregisterCommand()); - } private void updateNodeStat(UUID dnId, NodeReportProto nodeReport) { SCMNodeStat stat = nodeStats.get(dnId); @@ -679,24 +334,6 @@ public class SCMNodeManager @Override public void close() throws IOException { unregisterMXBean(); - executorService.shutdown(); - try { - if (!executorService.awaitTermination(5, TimeUnit.SECONDS)) { - executorService.shutdownNow(); - } - - if (!executorService.awaitTermination(5, TimeUnit.SECONDS)) { - LOG.error("Unable to shutdown NodeManager properly."); - } - } catch (InterruptedException e) { - executorService.shutdownNow(); - Thread.currentThread().interrupt(); - } - } - - @VisibleForTesting - long getLastHBProcessedCount() { - return lastHBProcessedCount; } /** @@ -739,27 +376,22 @@ public class SCMNodeManager datanodeDetails.setHostName(hostname); datanodeDetails.setIpAddress(ip); } - RegisteredCommand responseCommand = verifyDatanodeUUID(datanodeDetails); - if (responseCommand != null) { - return responseCommand; - } UUID dnId = datanodeDetails.getUuid(); - nodes.put(dnId, datanodeDetails); - totalNodes.incrementAndGet(); - healthyNodes.put(dnId, monotonicNow()); - healthyNodeCount.incrementAndGet(); - nodeStats.put(dnId, new SCMNodeStat()); - - if(inStartupChillMode.get() && - totalNodes.get() >= getMinimumChillModeNodes()) { - inStartupChillMode.getAndSet(false); - LOG.info("Leaving startup chill mode."); + try { + nodeStateManager.addNode(datanodeDetails); + nodeStats.put(dnId, new SCMNodeStat()); + if(inStartupChillMode.get() && + nodeStateManager.getTotalNodeCount() >= getMinimumChillModeNodes()) { + inStartupChillMode.getAndSet(false); + LOG.info("Leaving startup chill mode."); + } + // Updating Node Report, as registration is successful + updateNodeStat(datanodeDetails.getUuid(), nodeReport); + LOG.info("Data node with ID: {} Registered.", datanodeDetails.getUuid()); + } catch (NodeAlreadyExistsException e) { + LOG.trace("Datanode is already registered. Datanode: {}", + datanodeDetails.toString()); } - - // Updating Node Report, as registration is successful - updateNodeStat(datanodeDetails.getUuid(), nodeReport); - LOG.info("Data node with ID: {} Registered.", - datanodeDetails.getUuid()); RegisteredCommand.Builder builder = RegisteredCommand.newBuilder().setErrorCode(ErrorCode.success) .setDatanodeUUID(datanodeDetails.getUuidString()) @@ -771,45 +403,24 @@ public class SCMNodeManager } /** - * Verifies the datanode does not have a valid UUID already. - * - * @param datanodeDetails - Datanode Details. - * @return SCMCommand - */ - private RegisteredCommand verifyDatanodeUUID( - DatanodeDetails datanodeDetails) { - if (datanodeDetails.getUuid() != null && - nodes.containsKey(datanodeDetails.getUuid())) { - LOG.trace("Datanode is already registered. Datanode: {}", - datanodeDetails.toString()); - return RegisteredCommand.newBuilder() - .setErrorCode(ErrorCode.success) - .setClusterID(this.clusterID) - .setDatanodeUUID(datanodeDetails.getUuidString()) - .build(); - } - return null; - } - - /** * Send heartbeat to indicate the datanode is alive and doing well. * * @param datanodeDetails - DatanodeDetailsProto. - * @param nodeReport - node report. * @return SCMheartbeat response. * @throws IOException */ @Override - public List<SCMCommand> sendHeartbeat( - DatanodeDetails datanodeDetails, NodeReportProto nodeReport) { - + public List<SCMCommand> processHeartbeat(DatanodeDetails datanodeDetails) { Preconditions.checkNotNull(datanodeDetails, "Heartbeat is missing " + "DatanodeDetails."); - heartbeatQueue.add( - new HeartbeatQueueItem.Builder() - .setDatanodeDetails(datanodeDetails) - .setNodeReport(nodeReport) - .build()); + try { + nodeStateManager.updateLastHeartbeatTime(datanodeDetails); + } catch (NodeNotFoundException e) { + LOG.warn("SCM receive heartbeat from unregistered datanode {}", + datanodeDetails); + commandQueue.addCommand(datanodeDetails.getUuid(), + new ReregisterCommand()); + } return commandQueue.getCommand(datanodeDetails.getUuid()); } @@ -855,11 +466,6 @@ public class SCMNodeManager this.commandQueue.addCommand(dnId, command); } - @VisibleForTesting - public void setStaleNodeIntervalMs(long interval) { - this.staleNodeIntervalMs = interval; - } - @Override public void onMessage(CommandForDatanode commandForDatanode, EventPublisher publisher) { http://git-wip-us.apache.org/repos/asf/hadoop/blob/71df8c27/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/states/NodeAlreadyExistsException.java ---------------------------------------------------------------------- diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/states/NodeAlreadyExistsException.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/states/NodeAlreadyExistsException.java new file mode 100644 index 0000000..aa5c382 --- /dev/null +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/states/NodeAlreadyExistsException.java @@ -0,0 +1,45 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS,WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ + +package org.apache.hadoop.hdds.scm.node.states; + +/** + * This exception represents that there is already a node added to NodeStateMap + * with same UUID. + */ +public class NodeAlreadyExistsException extends NodeException { + + /** + * Constructs an {@code NodeAlreadyExistsException} with {@code null} + * as its error detail message. + */ + public NodeAlreadyExistsException() { + super(); + } + + /** + * Constructs an {@code NodeAlreadyExistsException} with the specified + * detail message. + * + * @param message + * The detail message (which is saved for later retrieval + * by the {@link #getMessage()} method) + */ + public NodeAlreadyExistsException(String message) { + super(message); + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/71df8c27/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/states/NodeException.java ---------------------------------------------------------------------- diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/states/NodeException.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/states/NodeException.java new file mode 100644 index 0000000..c67b55d --- /dev/null +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/states/NodeException.java @@ -0,0 +1,44 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS,WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ + +package org.apache.hadoop.hdds.scm.node.states; + +/** + * This exception represents all node related exceptions in NodeStateMap. + */ +public class NodeException extends Exception { + + /** + * Constructs an {@code NodeException} with {@code null} + * as its error detail message. + */ + public NodeException() { + super(); + } + + /** + * Constructs an {@code NodeException} with the specified + * detail message. + * + * @param message + * The detail message (which is saved for later retrieval + * by the {@link #getMessage()} method) + */ + public NodeException(String message) { + super(message); + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/71df8c27/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/states/NodeNotFoundException.java ---------------------------------------------------------------------- diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/states/NodeNotFoundException.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/states/NodeNotFoundException.java new file mode 100644 index 0000000..52a352e --- /dev/null +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/states/NodeNotFoundException.java @@ -0,0 +1,49 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS,WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ + +package org.apache.hadoop.hdds.scm.node.states; + +import java.io.IOException; + +/** + * This exception represents that the node that is being accessed does not + * exist in NodeStateMap. + */ +public class NodeNotFoundException extends NodeException { + + + /** + * Constructs an {@code NodeNotFoundException} with {@code null} + * as its error detail message. + */ + public NodeNotFoundException() { + super(); + } + + /** + * Constructs an {@code NodeNotFoundException} with the specified + * detail message. + * + * @param message + * The detail message (which is saved for later retrieval + * by the {@link #getMessage()} method) + */ + public NodeNotFoundException(String message) { + super(message); + } + +} --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
