HDDS-561. Move Node2ContainerMap and Node2PipelineMap to NodeManager. Contributed by Lokesh Jain.
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/a3929626 Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/a3929626 Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/a3929626 Branch: refs/heads/HEAD Commit: a39296260f8c77f3808e27b43c623a0edebe4a17 Parents: 81072d5 Author: Nanda kumar <na...@apache.org> Authored: Tue Oct 2 19:47:15 2018 +0530 Committer: Nanda kumar <na...@apache.org> Committed: Tue Oct 2 19:47:44 2018 +0530 ---------------------------------------------------------------------- .../scm/container/ContainerReportHandler.java | 16 ++-- .../hadoop/hdds/scm/node/DeadNodeHandler.java | 11 +-- .../hadoop/hdds/scm/node/NewNodeHandler.java | 9 +- .../hadoop/hdds/scm/node/NodeManager.java | 61 ++++++++++++++ .../hadoop/hdds/scm/node/NodeStateManager.java | 88 +++++++++++++++++++- .../hadoop/hdds/scm/node/SCMNodeManager.java | 83 ++++++++++++++++++ .../hdds/scm/node/SCMNodeStorageStatMap.java | 2 +- .../hadoop/hdds/scm/node/StaleNodeHandler.java | 9 +- .../hdds/scm/node/states/Node2ContainerMap.java | 2 +- .../hdds/scm/node/states/Node2PipelineMap.java | 75 +++++++++++++++++ .../hdds/scm/pipelines/Node2PipelineMap.java | 75 ----------------- .../hdds/scm/pipelines/PipelineSelector.java | 18 ++-- .../scm/server/StorageContainerManager.java | 14 ++-- .../hdds/scm/container/MockNodeManager.java | 87 +++++++++++++++++++ .../container/TestContainerReportHandler.java | 14 ++-- .../hdds/scm/node/TestDeadNodeHandler.java | 27 ++---- .../testutils/ReplicationNodeManagerMock.java | 82 ++++++++++++++++++ .../hdds/scm/pipeline/TestNode2PipelineMap.java | 2 +- .../hdds/scm/pipeline/TestPipelineClose.java | 2 +- 19 files changed, 520 insertions(+), 157 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hadoop/blob/a3929626/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerReportHandler.java ---------------------------------------------------------------------- diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerReportHandler.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerReportHandler.java index 3f156de..71935f0 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerReportHandler.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerReportHandler.java @@ -29,7 +29,7 @@ import org.apache.hadoop.hdds.scm.container.replication.ReplicationActivityStatu import org.apache.hadoop.hdds.scm.container.replication.ReplicationRequest; import org.apache.hadoop.hdds.scm.events.SCMEvents; import org.apache.hadoop.hdds.scm.exceptions.SCMException; -import org.apache.hadoop.hdds.scm.node.states.Node2ContainerMap; +import org.apache.hadoop.hdds.scm.node.NodeManager; import org.apache.hadoop.hdds.scm.node.states.ReportResult; import org.apache.hadoop.hdds.scm.server.SCMDatanodeHeartbeatDispatcher.ContainerReportFromDatanode; import org.apache.hadoop.hdds.server.events.EventHandler; @@ -48,7 +48,7 @@ public class ContainerReportHandler implements private static final Logger LOG = LoggerFactory.getLogger(ContainerReportHandler.class); - private final Node2ContainerMap node2ContainerMap; + private final NodeManager nodeManager; private final Mapping containerMapping; @@ -57,14 +57,14 @@ public class ContainerReportHandler implements private ReplicationActivityStatus replicationStatus; public ContainerReportHandler(Mapping containerMapping, - Node2ContainerMap node2ContainerMap, + NodeManager nodeManager, ReplicationActivityStatus replicationActivityStatus) { Preconditions.checkNotNull(containerMapping); - Preconditions.checkNotNull(node2ContainerMap); + Preconditions.checkNotNull(nodeManager); Preconditions.checkNotNull(replicationActivityStatus); this.containerStateManager = containerMapping.getStateManager(); + this.nodeManager = nodeManager; this.containerMapping = containerMapping; - this.node2ContainerMap = node2ContainerMap; this.replicationStatus = replicationActivityStatus; } @@ -89,11 +89,11 @@ public class ContainerReportHandler implements .map(ContainerID::new) .collect(Collectors.toSet()); - ReportResult<ContainerID> reportResult = node2ContainerMap - .processReport(datanodeOrigin.getUuid(), containerIds); + ReportResult<ContainerID> reportResult = nodeManager + .processContainerReport(datanodeOrigin.getUuid(), containerIds); //we have the report, so we can update the states for the next iteration. - node2ContainerMap + nodeManager .setContainersForDatanode(datanodeOrigin.getUuid(), containerIds); for (ContainerID containerID : reportResult.getMissingEntries()) { http://git-wip-us.apache.org/repos/asf/hadoop/blob/a3929626/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/DeadNodeHandler.java ---------------------------------------------------------------------- diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/DeadNodeHandler.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/DeadNodeHandler.java index 253b3ec..17edf9e 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/DeadNodeHandler.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/DeadNodeHandler.java @@ -26,7 +26,6 @@ import org.apache.hadoop.hdds.scm.container.ContainerStateManager; import org.apache.hadoop.hdds.scm.container.replication.ReplicationRequest; import org.apache.hadoop.hdds.scm.events.SCMEvents; import org.apache.hadoop.hdds.scm.exceptions.SCMException; -import org.apache.hadoop.hdds.scm.node.states.Node2ContainerMap; import org.apache.hadoop.hdds.server.events.EventHandler; import org.apache.hadoop.hdds.server.events.EventPublisher; @@ -38,8 +37,6 @@ import org.slf4j.LoggerFactory; */ public class DeadNodeHandler implements EventHandler<DatanodeDetails> { - private final Node2ContainerMap node2ContainerMap; - private final ContainerStateManager containerStateManager; private final NodeManager nodeManager; @@ -47,10 +44,8 @@ public class DeadNodeHandler implements EventHandler<DatanodeDetails> { private static final Logger LOG = LoggerFactory.getLogger(DeadNodeHandler.class); - public DeadNodeHandler( - Node2ContainerMap node2ContainerMap, - ContainerStateManager containerStateManager, NodeManager nodeManager) { - this.node2ContainerMap = node2ContainerMap; + public DeadNodeHandler(NodeManager nodeManager, + ContainerStateManager containerStateManager) { this.containerStateManager = containerStateManager; this.nodeManager = nodeManager; } @@ -61,7 +56,7 @@ public class DeadNodeHandler implements EventHandler<DatanodeDetails> { nodeManager.processDeadNode(datanodeDetails.getUuid()); Set<ContainerID> containers = - node2ContainerMap.getContainers(datanodeDetails.getUuid()); + nodeManager.getContainers(datanodeDetails.getUuid()); if (containers == null) { LOG.info("There's no containers in dead datanode {}, no replica will be" + " removed from the in-memory state.", datanodeDetails.getUuid()); http://git-wip-us.apache.org/repos/asf/hadoop/blob/a3929626/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/NewNodeHandler.java ---------------------------------------------------------------------- diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/NewNodeHandler.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/NewNodeHandler.java index 79b75a5..780aa2b 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/NewNodeHandler.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/NewNodeHandler.java @@ -20,7 +20,6 @@ package org.apache.hadoop.hdds.scm.node; import org.apache.hadoop.hdds.protocol.DatanodeDetails; import org.apache.hadoop.hdds.scm.exceptions.SCMException; -import org.apache.hadoop.hdds.scm.node.states.Node2ContainerMap; import org.apache.hadoop.hdds.server.events.EventHandler; import org.apache.hadoop.hdds.server.events.EventPublisher; @@ -31,17 +30,17 @@ import java.util.Collections; */ public class NewNodeHandler implements EventHandler<DatanodeDetails> { - private final Node2ContainerMap node2ContainerMap; + private final NodeManager nodeManager; - public NewNodeHandler(Node2ContainerMap node2ContainerMap) { - this.node2ContainerMap = node2ContainerMap; + public NewNodeHandler(NodeManager nodeManager) { + this.nodeManager = nodeManager; } @Override public void onMessage(DatanodeDetails datanodeDetails, EventPublisher publisher) { try { - node2ContainerMap.insertNewDatanode(datanodeDetails.getUuid(), + nodeManager.addDatanodeInContainerMap(datanodeDetails.getUuid(), Collections.emptySet()); } catch (SCMException e) { // TODO: log exception message. http://git-wip-us.apache.org/repos/asf/hadoop/blob/a3929626/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/NodeManager.java ---------------------------------------------------------------------- diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/NodeManager.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/NodeManager.java index fad3ee3..0dc1a0c 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/NodeManager.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/NodeManager.java @@ -18,11 +18,16 @@ package org.apache.hadoop.hdds.scm.node; import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.NodeReportProto; +import org.apache.hadoop.hdds.scm.container.ContainerID; +import org.apache.hadoop.hdds.scm.container.common.helpers.Pipeline; +import org.apache.hadoop.hdds.scm.container.common.helpers.PipelineID; import org.apache.hadoop.hdds.scm.container.placement.metrics.SCMNodeMetric; import org.apache.hadoop.hdds.scm.container.placement.metrics.SCMNodeStat; +import org.apache.hadoop.hdds.scm.exceptions.SCMException; import org.apache.hadoop.hdds.scm.node.states.NodeNotFoundException; import org.apache.hadoop.hdds.protocol.DatanodeDetails; import org.apache.hadoop.hdds.protocol.proto.HddsProtos.NodeState; +import org.apache.hadoop.hdds.scm.node.states.ReportResult; import org.apache.hadoop.hdds.server.events.EventHandler; import org.apache.hadoop.ozone.protocol.StorageContainerNodeProtocol; import org.apache.hadoop.ozone.protocol.commands.CommandForDatanode; @@ -31,6 +36,7 @@ import org.apache.hadoop.ozone.protocol.commands.SCMCommand; import java.io.Closeable; import java.util.List; import java.util.Map; +import java.util.Set; import java.util.UUID; /** @@ -134,6 +140,61 @@ public interface NodeManager extends StorageContainerNodeProtocol, NodeState getNodeState(DatanodeDetails datanodeDetails); /** + * Get set of pipelines a datanode is part of. + * @param dnId - datanodeID + * @return Set of PipelineID + */ + Set<PipelineID> getPipelineByDnID(UUID dnId); + + /** + * Add pipeline information in the NodeManager. + * @param pipeline - Pipeline to be added + */ + void addPipeline(Pipeline pipeline); + + /** + * Remove a pipeline information from the NodeManager. + * @param pipeline - Pipeline to be removed + */ + void removePipeline(Pipeline pipeline); + + /** + * Update set of containers available on a datanode. + * @param uuid - DatanodeID + * @param containerIds - Set of containerIDs + * @throws SCMException - if datanode is not known. For new datanode use + * addDatanodeInContainerMap call. + */ + void setContainersForDatanode(UUID uuid, Set<ContainerID> containerIds) + throws SCMException; + + /** + * Process containerReport received from datanode. + * @param uuid - DataonodeID + * @param containerIds - Set of containerIDs + * @return The result after processing containerReport + */ + ReportResult<ContainerID> processContainerReport(UUID uuid, + Set<ContainerID> containerIds); + + /** + * Return set of containerIDs available on a datanode. + * @param uuid - DatanodeID + * @return - set of containerIDs + */ + Set<ContainerID> getContainers(UUID uuid); + + /** + * Insert a new datanode with set of containerIDs for containers available + * on it. + * @param uuid - DatanodeID + * @param containerIDs - Set of ContainerIDs + * @throws SCMException - if datanode already exists + */ + void addDatanodeInContainerMap(UUID uuid, Set<ContainerID> containerIDs) + throws SCMException; + + /** * Add a {@link SCMCommand} to the command queue, which are * handled by HB thread asynchronously. * @param dnId datanode uuid http://git-wip-us.apache.org/repos/asf/hadoop/blob/a3929626/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/NodeStateManager.java ---------------------------------------------------------------------- diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/NodeStateManager.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/NodeStateManager.java index 1f99ffe..88f984b 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/NodeStateManager.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/NodeStateManager.java @@ -24,11 +24,14 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hdds.protocol.DatanodeDetails; import org.apache.hadoop.hdds.protocol.proto.HddsProtos.NodeState; import org.apache.hadoop.hdds.scm.HddsServerUtil; +import org.apache.hadoop.hdds.scm.container.ContainerID; +import org.apache.hadoop.hdds.scm.container.common.helpers.Pipeline; +import org.apache.hadoop.hdds.scm.container.common.helpers.PipelineID; import org.apache.hadoop.hdds.scm.container.placement.metrics.SCMNodeStat; import org.apache.hadoop.hdds.scm.events.SCMEvents; -import org.apache.hadoop.hdds.scm.node.states.NodeAlreadyExistsException; -import org.apache.hadoop.hdds.scm.node.states.NodeNotFoundException; -import org.apache.hadoop.hdds.scm.node.states.NodeStateMap; +import org.apache.hadoop.hdds.scm.exceptions.SCMException; +import org.apache.hadoop.hdds.scm.node.states.*; +import org.apache.hadoop.hdds.scm.node.states.Node2PipelineMap; import org.apache.hadoop.hdds.server.events.Event; import org.apache.hadoop.hdds.server.events.EventPublisher; import org.apache.hadoop.ozone.common.statemachine @@ -87,6 +90,15 @@ public class NodeStateManager implements Runnable, Closeable { */ private final NodeStateMap nodeStateMap; /** + * Maintains the mapping from node to pipelines a node is part of. + */ + private final Node2PipelineMap node2PipelineMap; + /** + * Maintains the map from node to ContainerIDs for the containers + * available on the node. + */ + private final Node2ContainerMap node2ContainerMap; + /** * Used for publishing node state change events. */ private final EventPublisher eventPublisher; @@ -118,6 +130,8 @@ public class NodeStateManager implements Runnable, Closeable { */ public NodeStateManager(Configuration conf, EventPublisher eventPublisher) { this.nodeStateMap = new NodeStateMap(); + this.node2PipelineMap = new Node2PipelineMap(); + this.node2ContainerMap = new Node2ContainerMap(); this.eventPublisher = eventPublisher; this.state2EventMap = new HashMap<>(); initialiseState2EventMap(); @@ -243,6 +257,14 @@ public class NodeStateManager implements Runnable, Closeable { } /** + * Adds a pipeline in the node2PipelineMap. + * @param pipeline - Pipeline to be added + */ + public void addPipeline(Pipeline pipeline) { + node2PipelineMap.addPipeline(pipeline); + } + + /** * Get information about the node. * * @param datanodeDetails DatanodeDetails @@ -353,6 +375,15 @@ public class NodeStateManager implements Runnable, Closeable { } /** + * Gets set of pipelineID a datanode belongs to. + * @param dnId - Datanode ID + * @return Set of PipelineID + */ + public Set<PipelineID> getPipelineByDnID(UUID dnId) { + return node2PipelineMap.getPipelines(dnId); + } + + /** * Returns the count of healthy nodes. * * @return healthy node count @@ -457,6 +488,57 @@ public class NodeStateManager implements Runnable, Closeable { } /** + * Removes a pipeline from the node2PipelineMap. + * @param pipeline - Pipeline to be removed + */ + public void removePipeline(Pipeline pipeline) { + node2PipelineMap.removePipeline(pipeline); + } + /** + * Update set of containers available on a datanode. + * @param uuid - DatanodeID + * @param containerIds - Set of containerIDs + * @throws SCMException - if datanode is not known. For new datanode use + * addDatanodeInContainerMap call. + */ + public void setContainersForDatanode(UUID uuid, Set<ContainerID> containerIds) + throws SCMException { + node2ContainerMap.setContainersForDatanode(uuid, containerIds); + } + + /** + * Process containerReport received from datanode. + * @param uuid - DataonodeID + * @param containerIds - Set of containerIDs + * @return The result after processing containerReport + */ + public ReportResult<ContainerID> processContainerReport(UUID uuid, + Set<ContainerID> containerIds) { + return node2ContainerMap.processReport(uuid, containerIds); + } + + /** + * Return set of containerIDs available on a datanode. + * @param uuid - DatanodeID + * @return - set of containerIDs + */ + public Set<ContainerID> getContainers(UUID uuid) { + return node2ContainerMap.getContainers(uuid); + } + + /** + * Insert a new datanode with set of containerIDs for containers available + * on it. + * @param uuid - DatanodeID + * @param containerIDs - Set of ContainerIDs + * @throws SCMException - if datanode already exists + */ + public void addDatanodeInContainerMap(UUID uuid, + Set<ContainerID> containerIDs) throws SCMException { + node2ContainerMap.insertNewDatanode(uuid, containerIDs); + } + + /** * Move Stale or Dead node to healthy if we got a heartbeat from them. * Move healthy nodes to stale nodes if it is needed. * Move Stales node to dead if needed. http://git-wip-us.apache.org/repos/asf/hadoop/blob/a3929626/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/SCMNodeManager.java ---------------------------------------------------------------------- diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/SCMNodeManager.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/SCMNodeManager.java index 620c816..36a6f15 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/SCMNodeManager.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/SCMNodeManager.java @@ -21,8 +21,13 @@ import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; import org.apache.hadoop.hdds.protocol.proto .StorageContainerDatanodeProtocolProtos.PipelineReportsProto; +import org.apache.hadoop.hdds.scm.container.ContainerID; +import org.apache.hadoop.hdds.scm.container.common.helpers.Pipeline; +import org.apache.hadoop.hdds.scm.container.common.helpers.PipelineID; +import org.apache.hadoop.hdds.scm.exceptions.SCMException; import org.apache.hadoop.hdds.scm.node.states.NodeAlreadyExistsException; import org.apache.hadoop.hdds.scm.node.states.NodeNotFoundException; +import org.apache.hadoop.hdds.scm.node.states.ReportResult; import org.apache.hadoop.hdds.scm.server.StorageContainerManager; import org.apache.hadoop.hdds.scm.VersionInfo; import org.apache.hadoop.hdds.scm.container.placement.metrics.SCMNodeMetric; @@ -59,6 +64,7 @@ import java.net.InetAddress; import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.Set; import java.util.UUID; import java.util.concurrent.atomic.AtomicBoolean; @@ -471,6 +477,83 @@ public class SCMNodeManager return nodeCountMap; } + /** + * Get set of pipelines a datanode is part of. + * @param dnId - datanodeID + * @return Set of PipelineID + */ + @Override + public Set<PipelineID> getPipelineByDnID(UUID dnId) { + return nodeStateManager.getPipelineByDnID(dnId); + } + + + /** + * Add pipeline information in the NodeManager. + * @param pipeline - Pipeline to be added + */ + @Override + public void addPipeline(Pipeline pipeline) { + nodeStateManager.addPipeline(pipeline); + } + + /** + * Remove a pipeline information from the NodeManager. + * @param pipeline - Pipeline to be removed + */ + @Override + public void removePipeline(Pipeline pipeline) { + nodeStateManager.removePipeline(pipeline); + } + + /** + * Update set of containers available on a datanode. + * @param uuid - DatanodeID + * @param containerIds - Set of containerIDs + * @throws SCMException - if datanode is not known. For new datanode use + * addDatanodeInContainerMap call. + */ + @Override + public void setContainersForDatanode(UUID uuid, + Set<ContainerID> containerIds) throws SCMException { + nodeStateManager.setContainersForDatanode(uuid, containerIds); + } + + /** + * Process containerReport received from datanode. + * @param uuid - DataonodeID + * @param containerIds - Set of containerIDs + * @return The result after processing containerReport + */ + @Override + public ReportResult<ContainerID> processContainerReport(UUID uuid, + Set<ContainerID> containerIds) { + return nodeStateManager.processContainerReport(uuid, containerIds); + } + + /** + * Return set of containerIDs available on a datanode. + * @param uuid - DatanodeID + * @return - set of containerIDs + */ + @Override + public Set<ContainerID> getContainers(UUID uuid) { + return nodeStateManager.getContainers(uuid); + } + + /** + * Insert a new datanode with set of containerIDs for containers available + * on it. + * @param uuid - DatanodeID + * @param containerIDs - Set of ContainerIDs + * @throws SCMException - if datanode already exists + */ + @Override + public void addDatanodeInContainerMap(UUID uuid, + Set<ContainerID> containerIDs) throws SCMException { + nodeStateManager.addDatanodeInContainerMap(uuid, containerIDs); + } + // TODO: // Since datanode commands are added through event queue, onMessage method // should take care of adding commands to command queue. http://git-wip-us.apache.org/repos/asf/hadoop/blob/a3929626/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/SCMNodeStorageStatMap.java ---------------------------------------------------------------------- diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/SCMNodeStorageStatMap.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/SCMNodeStorageStatMap.java index 6ea83df..1b0e5b5 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/SCMNodeStorageStatMap.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/SCMNodeStorageStatMap.java @@ -147,7 +147,7 @@ public class SCMNodeStorageStatMap implements SCMNodeStorageStatMXBean { * @param datanodeID - UUID of DN. * @param report - set of Storage Reports for the Datanode. * @throws SCMException - if we don't know about this datanode, for new DN - * use insertNewDatanode. + * use addDatanodeInContainerMap. */ public void updateDatanodeMap(UUID datanodeID, Set<StorageLocationReport> report) throws SCMException { http://git-wip-us.apache.org/repos/asf/hadoop/blob/a3929626/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/StaleNodeHandler.java ---------------------------------------------------------------------- diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/StaleNodeHandler.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/StaleNodeHandler.java index ddbba82..48939f1 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/StaleNodeHandler.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/StaleNodeHandler.java @@ -19,25 +19,18 @@ package org.apache.hadoop.hdds.scm.node; import org.apache.hadoop.hdds.protocol.DatanodeDetails; -import org.apache.hadoop.hdds.scm.node.states.Node2ContainerMap; import org.apache.hadoop.hdds.scm.pipelines.PipelineSelector; import org.apache.hadoop.hdds.server.events.EventHandler; import org.apache.hadoop.hdds.server.events.EventPublisher; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; /** * Handles Stale node event. */ public class StaleNodeHandler implements EventHandler<DatanodeDetails> { - static final Logger LOG = LoggerFactory.getLogger(StaleNodeHandler.class); - private final Node2ContainerMap node2ContainerMap; private final PipelineSelector pipelineSelector; - public StaleNodeHandler(Node2ContainerMap node2ContainerMap, - PipelineSelector pipelineSelector) { - this.node2ContainerMap = node2ContainerMap; + public StaleNodeHandler(PipelineSelector pipelineSelector) { this.pipelineSelector = pipelineSelector; } http://git-wip-us.apache.org/repos/asf/hadoop/blob/a3929626/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/states/Node2ContainerMap.java ---------------------------------------------------------------------- diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/states/Node2ContainerMap.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/states/Node2ContainerMap.java index 549080a..9625f81 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/states/Node2ContainerMap.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/states/Node2ContainerMap.java @@ -70,7 +70,7 @@ public class Node2ContainerMap extends Node2ObjectsMap<ContainerID> { * @param datanodeID - UUID of DN. * @param containers - Set of Containers tht is present on DN. * @throws SCMException - if we don't know about this datanode, for new DN - * use insertNewDatanode. + * use addDatanodeInContainerMap. */ public void setContainersForDatanode(UUID datanodeID, Set<ContainerID> containers) throws SCMException { http://git-wip-us.apache.org/repos/asf/hadoop/blob/a3929626/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/states/Node2PipelineMap.java ---------------------------------------------------------------------- diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/states/Node2PipelineMap.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/states/Node2PipelineMap.java new file mode 100644 index 0000000..87f2222 --- /dev/null +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/states/Node2PipelineMap.java @@ -0,0 +1,75 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + * + */ + +package org.apache.hadoop.hdds.scm.node.states; + +import org.apache.hadoop.hdds.protocol.DatanodeDetails; +import org.apache.hadoop.hdds.scm.container.common.helpers.Pipeline; +import org.apache.hadoop.hdds.scm.container.common.helpers.PipelineID; + +import java.util.HashSet; +import java.util.Set; +import java.util.UUID; + +/** + * This data structure maintains the list of pipelines which the given datanode is a part of. This + * information will be added whenever a new pipeline allocation happens. + * + * <p>TODO: this information needs to be regenerated from pipeline reports on SCM restart + */ +public class Node2PipelineMap extends Node2ObjectsMap<PipelineID> { + + /** Constructs a Node2PipelineMap Object. */ + public Node2PipelineMap() { + super(); + } + + /** + * Returns null if there no pipelines associated with this datanode ID. + * + * @param datanode - UUID + * @return Set of pipelines or Null. + */ + public Set<PipelineID> getPipelines(UUID datanode) { + return getObjects(datanode); + } + + /** + * Adds a pipeline entry to a given dataNode in the map. + * + * @param pipeline Pipeline to be added + */ + public synchronized void addPipeline(Pipeline pipeline) { + for (DatanodeDetails details : pipeline.getDatanodes().values()) { + UUID dnId = details.getUuid(); + dn2ObjectMap.computeIfAbsent(dnId, k -> new HashSet<>()) + .add(pipeline.getId()); + } + } + + public synchronized void removePipeline(Pipeline pipeline) { + for (DatanodeDetails details : pipeline.getDatanodes().values()) { + UUID dnId = details.getUuid(); + dn2ObjectMap.computeIfPresent(dnId, + (k, v) -> { + v.remove(pipeline.getId()); + return v; + }); + } + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/a3929626/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipelines/Node2PipelineMap.java ---------------------------------------------------------------------- diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipelines/Node2PipelineMap.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipelines/Node2PipelineMap.java deleted file mode 100644 index 87f2222..0000000 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipelines/Node2PipelineMap.java +++ /dev/null @@ -1,75 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with this - * work for additional information regarding copyright ownership. The ASF - * licenses this file to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT - * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the - * License for the specific language governing permissions and limitations under - * the License. - * - */ - -package org.apache.hadoop.hdds.scm.node.states; - -import org.apache.hadoop.hdds.protocol.DatanodeDetails; -import org.apache.hadoop.hdds.scm.container.common.helpers.Pipeline; -import org.apache.hadoop.hdds.scm.container.common.helpers.PipelineID; - -import java.util.HashSet; -import java.util.Set; -import java.util.UUID; - -/** - * This data structure maintains the list of pipelines which the given datanode is a part of. This - * information will be added whenever a new pipeline allocation happens. - * - * <p>TODO: this information needs to be regenerated from pipeline reports on SCM restart - */ -public class Node2PipelineMap extends Node2ObjectsMap<PipelineID> { - - /** Constructs a Node2PipelineMap Object. */ - public Node2PipelineMap() { - super(); - } - - /** - * Returns null if there no pipelines associated with this datanode ID. - * - * @param datanode - UUID - * @return Set of pipelines or Null. - */ - public Set<PipelineID> getPipelines(UUID datanode) { - return getObjects(datanode); - } - - /** - * Adds a pipeline entry to a given dataNode in the map. - * - * @param pipeline Pipeline to be added - */ - public synchronized void addPipeline(Pipeline pipeline) { - for (DatanodeDetails details : pipeline.getDatanodes().values()) { - UUID dnId = details.getUuid(); - dn2ObjectMap.computeIfAbsent(dnId, k -> new HashSet<>()) - .add(pipeline.getId()); - } - } - - public synchronized void removePipeline(Pipeline pipeline) { - for (DatanodeDetails details : pipeline.getDatanodes().values()) { - UUID dnId = details.getUuid(); - dn2ObjectMap.computeIfPresent(dnId, - (k, v) -> { - v.remove(pipeline.getId()); - return v; - }); - } - } -} http://git-wip-us.apache.org/repos/asf/hadoop/blob/a3929626/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipelines/PipelineSelector.java ---------------------------------------------------------------------- diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipelines/PipelineSelector.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipelines/PipelineSelector.java index 59d937e..c8d22ff 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipelines/PipelineSelector.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipelines/PipelineSelector.java @@ -33,7 +33,6 @@ import org.apache.hadoop.hdds.scm.container.placement.algorithms import org.apache.hadoop.hdds.scm.events.SCMEvents; import org.apache.hadoop.hdds.scm.exceptions.SCMException; import org.apache.hadoop.hdds.scm.node.NodeManager; -import org.apache.hadoop.hdds.scm.node.states.Node2PipelineMap; import org.apache.hadoop.hdds.scm.pipelines.ratis.RatisManagerImpl; import org.apache.hadoop.hdds.scm.pipelines.standalone.StandaloneManagerImpl; import org.apache.hadoop.hdds.protocol.DatanodeDetails; @@ -61,7 +60,6 @@ import java.util.HashMap; import java.util.Set; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; -import java.util.UUID; import java.util.concurrent.TimeUnit; import static org.apache.hadoop.hdds.scm.exceptions.SCMException.ResultCodes @@ -85,7 +83,7 @@ public class PipelineSelector { private final long containerSize; private final MetadataStore pipelineStore; private final PipelineStateManager stateManager; - private final Node2PipelineMap node2PipelineMap; + private final NodeManager nodeManager; private final Map<PipelineID, HashSet<ContainerID>> pipeline2ContainerMap; private final Map<PipelineID, Pipeline> pipelineMap; private final LeaseManager<Pipeline> pipelineLeaseManager; @@ -105,7 +103,6 @@ public class PipelineSelector { ScmConfigKeys.OZONE_SCM_CONTAINER_SIZE, ScmConfigKeys.OZONE_SCM_CONTAINER_SIZE_DEFAULT, StorageUnit.BYTES); - node2PipelineMap = new Node2PipelineMap(); pipelineMap = new ConcurrentHashMap<>(); pipelineManagerMap = new HashMap<>(); @@ -124,6 +121,7 @@ public class PipelineSelector { pipelineLeaseManager.start(); stateManager = new PipelineStateManager(); + this.nodeManager = nodeManager; pipeline2ContainerMap = new HashMap<>(); // Write the container name to pipeline mapping. @@ -361,10 +359,6 @@ public class PipelineSelector { } } - public Set<PipelineID> getPipelineByDnID(UUID dnId) { - return node2PipelineMap.getPipelines(dnId); - } - private void addExistingPipeline(Pipeline pipeline) throws IOException { LifeCycleState state = pipeline.getLifeCycleState(); switch (state) { @@ -379,7 +373,7 @@ public class PipelineSelector { // when all the nodes have reported. pipelineMap.put(pipeline.getId(), pipeline); pipeline2ContainerMap.put(pipeline.getId(), new HashSet<>()); - node2PipelineMap.addPipeline(pipeline); + nodeManager.addPipeline(pipeline); // reset the datanodes in the pipeline // they will be reset on pipeline.resetPipeline(); @@ -393,7 +387,7 @@ public class PipelineSelector { } public void handleStaleNode(DatanodeDetails dn) { - Set<PipelineID> pipelineIDs = getPipelineByDnID(dn.getUuid()); + Set<PipelineID> pipelineIDs = nodeManager.getPipelineByDnID(dn.getUuid()); for (PipelineID id : pipelineIDs) { LOG.info("closing pipeline {}.", id); eventPublisher.fireEvent(SCMEvents.PIPELINE_CLOSE, id); @@ -436,7 +430,7 @@ public class PipelineSelector { case CREATE: pipelineMap.put(pipeline.getId(), pipeline); pipeline2ContainerMap.put(pipeline.getId(), new HashSet<>()); - node2PipelineMap.addPipeline(pipeline); + nodeManager.addPipeline(pipeline); // Acquire lease on pipeline Lease<Pipeline> pipelineLease = pipelineLeaseManager.acquire(pipeline); // Register callback to be executed in case of timeout @@ -459,7 +453,7 @@ public class PipelineSelector { case TIMEOUT: closePipeline(pipeline); pipeline2ContainerMap.remove(pipeline.getId()); - node2PipelineMap.removePipeline(pipeline); + nodeManager.removePipeline(pipeline); pipelineMap.remove(pipeline.getId()); break; default: http://git-wip-us.apache.org/repos/asf/hadoop/blob/a3929626/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/StorageContainerManager.java ---------------------------------------------------------------------- diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/StorageContainerManager.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/StorageContainerManager.java index bb72075..a6a967c 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/StorageContainerManager.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/StorageContainerManager.java @@ -62,7 +62,6 @@ import org.apache.hadoop.hdds.scm.node.NodeManager; import org.apache.hadoop.hdds.scm.node.NodeReportHandler; import org.apache.hadoop.hdds.scm.node.SCMNodeManager; import org.apache.hadoop.hdds.scm.node.StaleNodeHandler; -import org.apache.hadoop.hdds.scm.node.states.Node2ContainerMap; import org.apache.hadoop.hdds.scm.pipelines.PipelineCloseHandler; import org.apache.hadoop.hdds.scm.pipelines.PipelineActionEventHandler; import org.apache.hadoop.hdds.scm.pipelines.PipelineReportHandler; @@ -212,8 +211,6 @@ public final class StorageContainerManager extends ServiceRuntimeInfoImpl scmBlockManager = new BlockManagerImpl( conf, getScmNodeManager(), scmContainerManager, eventQueue); - Node2ContainerMap node2ContainerMap = new Node2ContainerMap(); - replicationStatus = new ReplicationActivityStatus(); CloseContainerEventHandler closeContainerHandler = @@ -226,18 +223,17 @@ public final class StorageContainerManager extends ServiceRuntimeInfoImpl CommandStatusReportHandler cmdStatusReportHandler = new CommandStatusReportHandler(); - NewNodeHandler newNodeHandler = new NewNodeHandler(node2ContainerMap); + NewNodeHandler newNodeHandler = new NewNodeHandler(scmNodeManager); StaleNodeHandler staleNodeHandler = - new StaleNodeHandler(node2ContainerMap, - scmContainerManager.getPipelineSelector()); - DeadNodeHandler deadNodeHandler = new DeadNodeHandler(node2ContainerMap, - getScmContainerManager().getStateManager(), scmNodeManager); + new StaleNodeHandler(scmContainerManager.getPipelineSelector()); + DeadNodeHandler deadNodeHandler = new DeadNodeHandler(scmNodeManager, + getScmContainerManager().getStateManager()); ContainerActionsHandler actionsHandler = new ContainerActionsHandler(); PendingDeleteHandler pendingDeleteHandler = new PendingDeleteHandler(scmBlockManager.getSCMBlockDeletingService()); ContainerReportHandler containerReportHandler = - new ContainerReportHandler(scmContainerManager, node2ContainerMap, + new ContainerReportHandler(scmContainerManager, scmNodeManager, replicationStatus); scmChillModeManager = new SCMChillModeManager(conf, getScmContainerManager().getStateManager().getAllContainers(), http://git-wip-us.apache.org/repos/asf/hadoop/blob/a3929626/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/MockNodeManager.java ---------------------------------------------------------------------- diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/MockNodeManager.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/MockNodeManager.java index 0d90728..3221053 100644 --- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/MockNodeManager.java +++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/MockNodeManager.java @@ -19,9 +19,14 @@ package org.apache.hadoop.hdds.scm.container; import org.apache.hadoop.hdds.protocol.proto .StorageContainerDatanodeProtocolProtos.PipelineReportsProto; import org.apache.hadoop.hdds.scm.TestUtils; +import org.apache.hadoop.hdds.scm.container.common.helpers.Pipeline; +import org.apache.hadoop.hdds.scm.container.common.helpers.PipelineID; import org.apache.hadoop.hdds.scm.container.placement.metrics.SCMNodeMetric; import org.apache.hadoop.hdds.scm.container.placement.metrics.SCMNodeStat; +import org.apache.hadoop.hdds.scm.exceptions.SCMException; import org.apache.hadoop.hdds.scm.node.NodeManager; +import org.apache.hadoop.hdds.scm.node.states.Node2ContainerMap; +import org.apache.hadoop.hdds.scm.node.states.Node2PipelineMap; import org.apache.hadoop.hdds.scm.node.states.NodeNotFoundException; import org.apache.hadoop.hdds.protocol.DatanodeDetails; import org.apache.hadoop.hdds.protocol.proto.HddsProtos; @@ -29,6 +34,7 @@ import org.apache.hadoop.hdds.protocol.proto .StorageContainerDatanodeProtocolProtos.NodeReportProto; import org.apache.hadoop.hdds.protocol.proto .StorageContainerDatanodeProtocolProtos.SCMVersionRequestProto; +import org.apache.hadoop.hdds.scm.node.states.ReportResult; import org.apache.hadoop.hdds.server.events.EventPublisher; import org.apache.hadoop.ozone.OzoneConsts; import org.apache.hadoop.ozone.protocol.VersionResponse; @@ -42,6 +48,7 @@ import java.util.HashMap; import java.util.LinkedList; import java.util.List; import java.util.Map; +import java.util.Set; import java.util.UUID; import static org.apache.hadoop.hdds.protocol.proto.HddsProtos.NodeState.DEAD; @@ -73,12 +80,16 @@ public class MockNodeManager implements NodeManager { private final SCMNodeStat aggregateStat; private boolean chillmode; private final Map<UUID, List<SCMCommand>> commandMap; + private final Node2PipelineMap node2PipelineMap; + private final Node2ContainerMap node2ContainerMap; public MockNodeManager(boolean initializeFakeNodes, int nodeCount) { this.healthyNodes = new LinkedList<>(); this.staleNodes = new LinkedList<>(); this.deadNodes = new LinkedList<>(); this.nodeMetricMap = new HashMap<>(); + this.node2PipelineMap = new Node2PipelineMap(); + this.node2ContainerMap = new Node2ContainerMap(); aggregateStat = new SCMNodeStat(); if (initializeFakeNodes) { for (int x = 0; x < nodeCount; x++) { @@ -289,6 +300,34 @@ public class MockNodeManager implements NodeManager { return null; } + /** + * Get set of pipelines a datanode is part of. + * @param dnId - datanodeID + * @return Set of PipelineID + */ + @Override + public Set<PipelineID> getPipelineByDnID(UUID dnId) { + return node2PipelineMap.getPipelines(dnId); + } + + /** + * Add pipeline information in the NodeManager. + * @param pipeline - Pipeline to be added + */ + @Override + public void addPipeline(Pipeline pipeline) { + node2PipelineMap.addPipeline(pipeline); + } + + /** + * Remove a pipeline information from the NodeManager. + * @param pipeline - Pipeline to be removed + */ + @Override + public void removePipeline(Pipeline pipeline) { + node2PipelineMap.removePipeline(pipeline); + } + @Override public void addDatanodeCommand(UUID dnId, SCMCommand command) { if(commandMap.containsKey(dnId)) { @@ -313,6 +352,54 @@ public class MockNodeManager implements NodeManager { // do nothing } + /** + * Update set of containers available on a datanode. + * @param uuid - DatanodeID + * @param containerIds - Set of containerIDs + * @throws SCMException - if datanode is not known. For new datanode use + * addDatanodeInContainerMap call. + */ + @Override + public void setContainersForDatanode(UUID uuid, Set<ContainerID> containerIds) + throws SCMException { + node2ContainerMap.setContainersForDatanode(uuid, containerIds); + } + + /** + * Process containerReport received from datanode. + * @param uuid - DataonodeID + * @param containerIds - Set of containerIDs + * @return The result after processing containerReport + */ + @Override + public ReportResult<ContainerID> processContainerReport(UUID uuid, + Set<ContainerID> containerIds) { + return node2ContainerMap.processReport(uuid, containerIds); + } + + /** + * Return set of containerIDs available on a datanode. + * @param uuid - DatanodeID + * @return - set of containerIDs + */ + @Override + public Set<ContainerID> getContainers(UUID uuid) { + return node2ContainerMap.getContainers(uuid); + } + + /** + * Insert a new datanode with set of containerIDs for containers available + * on it. + * @param uuid - DatanodeID + * @param containerIDs - Set of ContainerIDs + * @throws SCMException - if datanode already exists + */ + @Override + public void addDatanodeInContainerMap(UUID uuid, + Set<ContainerID> containerIDs) throws SCMException { + node2ContainerMap.insertNewDatanode(uuid, containerIDs); + } + // Returns the number of commands that is queued to this node manager. public int getCommandCount(DatanodeDetails dd) { List<SCMCommand> list = commandMap.get(dd.getUuid()); http://git-wip-us.apache.org/repos/asf/hadoop/blob/a3929626/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/TestContainerReportHandler.java ---------------------------------------------------------------------- diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/TestContainerReportHandler.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/TestContainerReportHandler.java index a59179b..f79ae1e 100644 --- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/TestContainerReportHandler.java +++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/TestContainerReportHandler.java @@ -37,7 +37,7 @@ import org.apache.hadoop.hdds.scm.container.common.helpers.PipelineID; import org.apache.hadoop.hdds.scm.container.replication .ReplicationActivityStatus; import org.apache.hadoop.hdds.scm.container.replication.ReplicationRequest; -import org.apache.hadoop.hdds.scm.node.states.Node2ContainerMap; +import org.apache.hadoop.hdds.scm.node.NodeManager; import org.apache.hadoop.hdds.scm.pipelines.PipelineSelector; import org.apache.hadoop.hdds.scm.server.SCMDatanodeHeartbeatDispatcher .ContainerReportFromDatanode; @@ -60,6 +60,7 @@ import org.slf4j.LoggerFactory; public class TestContainerReportHandler implements EventPublisher { private List<Object> publishedEvents = new ArrayList<>(); + private final NodeManager nodeManager = new MockNodeManager(true, 1); private static final Logger LOG = LoggerFactory.getLogger(TestContainerReportHandler.class); @@ -73,7 +74,6 @@ public class TestContainerReportHandler implements EventPublisher { public void test() throws IOException { //GIVEN OzoneConfiguration conf = new OzoneConfiguration(); - Node2ContainerMap node2ContainerMap = new Node2ContainerMap(); Mapping mapping = Mockito.mock(Mapping.class); PipelineSelector selector = Mockito.mock(PipelineSelector.class); @@ -96,17 +96,17 @@ public class TestContainerReportHandler implements EventPublisher { new ReplicationActivityStatus(); ContainerReportHandler reportHandler = - new ContainerReportHandler(mapping, node2ContainerMap, + new ContainerReportHandler(mapping, nodeManager, replicationActivityStatus); DatanodeDetails dn1 = TestUtils.randomDatanodeDetails(); DatanodeDetails dn2 = TestUtils.randomDatanodeDetails(); DatanodeDetails dn3 = TestUtils.randomDatanodeDetails(); DatanodeDetails dn4 = TestUtils.randomDatanodeDetails(); - node2ContainerMap.insertNewDatanode(dn1.getUuid(), new HashSet<>()); - node2ContainerMap.insertNewDatanode(dn2.getUuid(), new HashSet<>()); - node2ContainerMap.insertNewDatanode(dn3.getUuid(), new HashSet<>()); - node2ContainerMap.insertNewDatanode(dn4.getUuid(), new HashSet<>()); + nodeManager.addDatanodeInContainerMap(dn1.getUuid(), new HashSet<>()); + nodeManager.addDatanodeInContainerMap(dn2.getUuid(), new HashSet<>()); + nodeManager.addDatanodeInContainerMap(dn3.getUuid(), new HashSet<>()); + nodeManager.addDatanodeInContainerMap(dn4.getUuid(), new HashSet<>()); PipelineSelector pipelineSelector = Mockito.mock(PipelineSelector.class); Pipeline pipeline = new Pipeline("leader", LifeCycleState.CLOSED, http://git-wip-us.apache.org/repos/asf/hadoop/blob/a3929626/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestDeadNodeHandler.java ---------------------------------------------------------------------- diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestDeadNodeHandler.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestDeadNodeHandler.java index 6966322..7bba032 100644 --- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestDeadNodeHandler.java +++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestDeadNodeHandler.java @@ -39,7 +39,6 @@ import org.apache.hadoop.hdds.scm.container.replication.ReplicationRequest; import org.apache.hadoop.hdds.scm.events.SCMEvents; import org.apache.hadoop.hdds.scm.container.placement.metrics.SCMNodeStat; import org.apache.hadoop.hdds.scm.exceptions.SCMException; -import org.apache.hadoop.hdds.scm.node.states.Node2ContainerMap; import org.apache.hadoop.hdds.scm.pipelines.PipelineSelector; import org.apache.hadoop.hdds.scm.server.SCMDatanodeHeartbeatDispatcher.NodeReportFromDatanode; import org.apache.hadoop.hdds.server.events.EventPublisher; @@ -60,7 +59,6 @@ public class TestDeadNodeHandler { private List<ReplicationRequest> sentEvents = new ArrayList<>(); private SCMNodeManager nodeManager; - private Node2ContainerMap node2ContainerMap; private ContainerStateManager containerStateManager; private NodeReportHandler nodeReportHandler; private DeadNodeHandler deadNodeHandler; @@ -70,14 +68,13 @@ public class TestDeadNodeHandler { @Before public void setup() throws IOException { OzoneConfiguration conf = new OzoneConfiguration(); - node2ContainerMap = new Node2ContainerMap(); containerStateManager = new ContainerStateManager(conf, Mockito.mock(Mapping.class), Mockito.mock(PipelineSelector.class)); eventQueue = new EventQueue(); nodeManager = new SCMNodeManager(conf, "cluster1", null, eventQueue); - deadNodeHandler = new DeadNodeHandler(node2ContainerMap, - containerStateManager, nodeManager); + deadNodeHandler = new DeadNodeHandler(nodeManager, + containerStateManager); eventQueue.addHandler(SCMEvents.DEAD_NODE, deadNodeHandler); publisher = Mockito.mock(EventPublisher.class); nodeReportHandler = new NodeReportHandler(nodeManager); @@ -96,8 +93,8 @@ public class TestDeadNodeHandler { ContainerInfo container3 = TestUtils.allocateContainer(containerStateManager); - registerReplicas(node2ContainerMap, datanode1, container1, container2); - registerReplicas(node2ContainerMap, datanode2, container1, container3); + registerReplicas(datanode1, container1, container2); + registerReplicas(datanode2, container1, container3); registerReplicas(containerStateManager, container1, datanode1, datanode2); registerReplicas(containerStateManager, container2, datanode1); @@ -105,13 +102,8 @@ public class TestDeadNodeHandler { TestUtils.closeContainer(containerStateManager, container1); - //WHEN datanode1 is dead deadNodeHandler.onMessage(datanode1, publisher); - //THEN - //node2ContainerMap has not been changed - Assert.assertEquals(2, node2ContainerMap.size()); - Set<DatanodeDetails> container1Replicas = containerStateManager.getContainerStateMap() .getContainerReplicas(new ContainerID(container1.getContainerID())); @@ -168,7 +160,7 @@ public class TestDeadNodeHandler { ContainerInfo container1 = TestUtils.allocateContainer(containerStateManager); - registerReplicas(node2ContainerMap, datanode1, container1); + registerReplicas(datanode1, container1); SCMNodeStat stat = nodeManager.getStats(); Assert.assertTrue(stat.getCapacity().get() == 300); @@ -211,7 +203,7 @@ public class TestDeadNodeHandler { ContainerInfo container1 = TestUtils.allocateContainer(containerStateManager); - registerReplicas(node2ContainerMap, dn1, container1); + registerReplicas(dn1, container1); deadNodeHandler.onMessage(dn1, eventQueue); Assert.assertTrue(logCapturer.getOutput().contains( @@ -226,12 +218,11 @@ public class TestDeadNodeHandler { datanodes); } - private void registerReplicas(Node2ContainerMap node2ConMap, - DatanodeDetails datanode, + private void registerReplicas(DatanodeDetails datanode, ContainerInfo... containers) throws SCMException { - node2ConMap - .insertNewDatanode(datanode.getUuid(), + nodeManager + .addDatanodeInContainerMap(datanode.getUuid(), Arrays.stream(containers) .map(container -> new ContainerID(container.getContainerID())) .collect(Collectors.toSet())); http://git-wip-us.apache.org/repos/asf/hadoop/blob/a3929626/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/ozone/container/testutils/ReplicationNodeManagerMock.java ---------------------------------------------------------------------- diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/ozone/container/testutils/ReplicationNodeManagerMock.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/ozone/container/testutils/ReplicationNodeManagerMock.java index a9a00ef..74c3932 100644 --- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/ozone/container/testutils/ReplicationNodeManagerMock.java +++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/ozone/container/testutils/ReplicationNodeManagerMock.java @@ -19,8 +19,12 @@ package org.apache.hadoop.ozone.container.testutils; import com.google.common.base.Preconditions; import org.apache.hadoop.hdds.protocol.proto .StorageContainerDatanodeProtocolProtos.PipelineReportsProto; +import org.apache.hadoop.hdds.scm.container.ContainerID; +import org.apache.hadoop.hdds.scm.container.common.helpers.Pipeline; +import org.apache.hadoop.hdds.scm.container.common.helpers.PipelineID; import org.apache.hadoop.hdds.scm.container.placement.metrics.SCMNodeMetric; import org.apache.hadoop.hdds.scm.container.placement.metrics.SCMNodeStat; +import org.apache.hadoop.hdds.scm.exceptions.SCMException; import org.apache.hadoop.hdds.scm.node.CommandQueue; import org.apache.hadoop.hdds.scm.node.NodeManager; import org.apache.hadoop.hdds.scm.node.states.NodeNotFoundException; @@ -30,6 +34,7 @@ import org.apache.hadoop.hdds.protocol.proto .StorageContainerDatanodeProtocolProtos.NodeReportProto; import org.apache.hadoop.hdds.protocol.proto .StorageContainerDatanodeProtocolProtos.SCMVersionRequestProto; +import org.apache.hadoop.hdds.scm.node.states.ReportResult; import org.apache.hadoop.hdds.server.events.EventPublisher; import org.apache.hadoop.ozone.protocol.VersionResponse; import org.apache.hadoop.ozone.protocol.commands.CommandForDatanode; @@ -39,6 +44,7 @@ import org.apache.hadoop.ozone.protocol.commands.SCMCommand; import java.io.IOException; import java.util.List; import java.util.Map; +import java.util.Set; import java.util.UUID; /** @@ -217,6 +223,82 @@ public class ReplicationNodeManagerMock implements NodeManager { } /** + * Get set of pipelines a datanode is part of. + * @param dnId - datanodeID + * @return Set of PipelineID + */ + @Override + public Set<PipelineID> getPipelineByDnID(UUID dnId) { + throw new UnsupportedOperationException("Not yet implemented"); + } + + /** + * Add pipeline information in the NodeManager. + * @param pipeline - Pipeline to be added + */ + @Override + public void addPipeline(Pipeline pipeline) { + throw new UnsupportedOperationException("Not yet implemented"); + } + + /** + * Remove a pipeline information from the NodeManager. + * @param pipeline - Pipeline to be removed + */ + @Override + public void removePipeline(Pipeline pipeline) { + throw new UnsupportedOperationException("Not yet implemented"); + } + + /** + * Update set of containers available on a datanode. + * @param uuid - DatanodeID + * @param containerIds - Set of containerIDs + * @throws SCMException - if datanode is not known. For new datanode use + * addDatanodeInContainerMap call. + */ + @Override + public void setContainersForDatanode(UUID uuid, Set<ContainerID> containerIds) + throws SCMException { + throw new UnsupportedOperationException("Not yet implemented"); + } + + /** + * Process containerReport received from datanode. + * @param uuid - DataonodeID + * @param containerIds - Set of containerIDs + * @return The result after processing containerReport + */ + @Override + public ReportResult<ContainerID> processContainerReport(UUID uuid, + Set<ContainerID> containerIds) { + throw new UnsupportedOperationException("Not yet implemented"); + } + + /** + * Return set of containerIDs available on a datanode. + * @param uuid - DatanodeID + * @return - set of containerIDs + */ + @Override + public Set<ContainerID> getContainers(UUID uuid) { + throw new UnsupportedOperationException("Not yet implemented"); + } + + /** + * Insert a new datanode with set of containerIDs for containers available + * on it. + * @param uuid - DatanodeID + * @param containerIDs - Set of ContainerIDs + * @throws SCMException - if datanode already exists + */ + @Override + public void addDatanodeInContainerMap(UUID uuid, + Set<ContainerID> containerIDs) throws SCMException { + throw new UnsupportedOperationException("Not yet implemented"); + } + + /** * Closes this stream and releases any system resources associated * with it. If the stream is already closed then invoking this * method has no effect. http://git-wip-us.apache.org/repos/asf/hadoop/blob/a3929626/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestNode2PipelineMap.java ---------------------------------------------------------------------- diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestNode2PipelineMap.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestNode2PipelineMap.java index ad3798e..e61a909 100644 --- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestNode2PipelineMap.java +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestNode2PipelineMap.java @@ -97,7 +97,7 @@ public class TestNode2PipelineMap { Assert.assertEquals(3, dns.size()); // get pipeline details by dnid - Set<PipelineID> pipelines = mapping.getPipelineSelector() + Set<PipelineID> pipelines = scm.getScmNodeManager() .getPipelineByDnID(dns.get(0).getUuid()); Assert.assertEquals(1, pipelines.size()); pipelines.forEach(p -> Assert.assertEquals(p, http://git-wip-us.apache.org/repos/asf/hadoop/blob/a3929626/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestPipelineClose.java ---------------------------------------------------------------------- diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestPipelineClose.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestPipelineClose.java index 5eabfb9..b02eae2 100644 --- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestPipelineClose.java +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestPipelineClose.java @@ -119,7 +119,7 @@ public class TestPipelineClose { HddsProtos.LifeCycleState.CLOSED); for (DatanodeDetails dn : ratisContainer1.getPipeline().getMachines()) { // Assert that the pipeline has been removed from Node2PipelineMap as well - Assert.assertEquals(pipelineSelector.getPipelineByDnID( + Assert.assertEquals(scm.getScmNodeManager().getPipelineByDnID( dn.getUuid()).size(), 0); } } --------------------------------------------------------------------- To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org For additional commands, e-mail: common-commits-h...@hadoop.apache.org