This is an automated email from the ASF dual-hosted git repository. sammichen pushed a commit to branch HDDS-1564 in repository https://gitbox.apache.org/repos/asf/hadoop-ozone.git
commit 44c783fcaefedc2f5b746a4a87c2f789351ce31c Author: Li Cheng <bloodhell2...@gmail.com> AuthorDate: Thu Sep 5 11:51:40 2019 +0800 HDDS-1577. Add default pipeline placement policy implementation. (#1366) (cherry picked from commit b640a5f6d53830aee4b9c2a7d17bf57c987962cd) --- .../org/apache/hadoop/hdds/scm/ScmConfigKeys.java | 5 + .../common/src/main/resources/ozone-default.xml | 7 + .../apache/hadoop/hdds/scm/node/NodeManager.java | 14 + .../hadoop/hdds/scm/node/NodeStateManager.java | 9 + .../hadoop/hdds/scm/node/SCMNodeManager.java | 19 ++ .../hdds/scm/node/states/Node2ObjectsMap.java | 4 +- .../hdds/scm/node/states/Node2PipelineMap.java | 12 +- .../hdds/scm/pipeline/PipelinePlacementPolicy.java | 338 +++++++++++++++++++++ .../hadoop/hdds/scm/container/MockNodeManager.java | 36 ++- .../scm/pipeline/TestPipelinePlacementPolicy.java | 197 ++++++++++++ .../testutils/ReplicationNodeManagerMock.java | 16 + 11 files changed, 654 insertions(+), 3 deletions(-) diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/ScmConfigKeys.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/ScmConfigKeys.java index 02aba5c..7810362 100644 --- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/ScmConfigKeys.java +++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/ScmConfigKeys.java @@ -315,6 +315,11 @@ public final class ScmConfigKeys { public static final String OZONE_SCM_PIPELINE_OWNER_CONTAINER_COUNT = "ozone.scm.pipeline.owner.container.count"; public static final int OZONE_SCM_PIPELINE_OWNER_CONTAINER_COUNT_DEFAULT = 3; + // Pipeline placement policy: + // the max number of pipelines can a single datanode be engaged in. + public static final String OZONE_DATANODE_MAX_PIPELINE_ENGAGEMENT = + "ozone.scm.datanode.max.pipeline.engagement"; + public static final int OZONE_DATANODE_MAX_PIPELINE_ENGAGEMENT_DEFAULT = 5; public static final String OZONE_SCM_KEY_VALUE_CONTAINER_DELETION_CHOOSING_POLICY = diff --git a/hadoop-hdds/common/src/main/resources/ozone-default.xml b/hadoop-hdds/common/src/main/resources/ozone-default.xml index 0dfd179..1a3f76a 100644 --- a/hadoop-hdds/common/src/main/resources/ozone-default.xml +++ b/hadoop-hdds/common/src/main/resources/ozone-default.xml @@ -830,6 +830,13 @@ </description> </property> <property> + <name>ozone.scm.datanode.max.pipeline.engagement</name> + <value>5</value> + <tag>OZONE, SCM, PIPELINE</tag> + <description>Max number of pipelines per datanode can be engaged in. + </description> + </property> + <property> <name>ozone.scm.container.size</name> <value>5GB</value> <tag>OZONE, PERFORMANCE, MANAGEMENT</tag> diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/NodeManager.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/NodeManager.java index fd8bb87..37562fe 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/NodeManager.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/NodeManager.java @@ -19,6 +19,7 @@ package org.apache.hadoop.hdds.scm.node; import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.NodeReportProto; import org.apache.hadoop.hdds.scm.container.ContainerID; +import org.apache.hadoop.hdds.scm.net.NetworkTopology; import org.apache.hadoop.hdds.scm.pipeline.Pipeline; import org.apache.hadoop.hdds.scm.pipeline.PipelineID; import org.apache.hadoop.hdds.scm.container.placement.metrics.SCMNodeMetric; @@ -118,6 +119,13 @@ public interface NodeManager extends StorageContainerNodeProtocol, Set<PipelineID> getPipelines(DatanodeDetails datanodeDetails); /** + * Get the count of pipelines a datanodes is associated with. + * @param datanodeDetails DatanodeDetails + * @return The number of pipelines + */ + int getPipelinesCount(DatanodeDetails datanodeDetails); + + /** * Add pipeline information in the NodeManager. * @param pipeline - Pipeline to be added */ @@ -199,4 +207,10 @@ public interface NodeManager extends StorageContainerNodeProtocol, * @return the given datanode, or empty list if none found */ List<DatanodeDetails> getNodesByAddress(String address); + + /** + * Get cluster map as in network topology for this node manager. + * @return cluster map + */ + NetworkTopology getClusterNetworkTopologyMap(); } diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/NodeStateManager.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/NodeStateManager.java index 954cb0e..9d2a9f2 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/NodeStateManager.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/NodeStateManager.java @@ -284,6 +284,15 @@ public class NodeStateManager implements Runnable, Closeable { } /** + * Get the count of pipelines associated to single datanode. + * @param datanodeDetails single datanode + * @return number of pipelines associated with it + */ + public int getPipelinesCount(DatanodeDetails datanodeDetails) { + return node2PipelineMap.getPipelinesCount(datanodeDetails.getUuid()); + } + + /** * Get information about the node. * * @param datanodeDetails DatanodeDetails diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/SCMNodeManager.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/SCMNodeManager.java index f077e72..66cca46 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/SCMNodeManager.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/SCMNodeManager.java @@ -510,6 +510,16 @@ public class SCMNodeManager implements NodeManager { } /** + * Get the count of pipelines a datanodes is associated with. + * @param datanodeDetails DatanodeDetails + * @return The number of pipelines + */ + @Override + public int getPipelinesCount(DatanodeDetails datanodeDetails) { + return nodeStateManager.getPipelinesCount(datanodeDetails); + } + + /** * Add pipeline information in the NodeManager. * * @param pipeline - Pipeline to be added @@ -643,6 +653,15 @@ public class SCMNodeManager implements NodeManager { return results; } + /** + * Get cluster map as in network topology for this node manager. + * @return cluster map + */ + @Override + public NetworkTopology getClusterNetworkTopologyMap() { + return clusterMap; + } + private String nodeResolve(String hostname) { List<String> hosts = new ArrayList<>(1); hosts.add(hostname); diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/states/Node2ObjectsMap.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/states/Node2ObjectsMap.java index 37525b0..57a377d 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/states/Node2ObjectsMap.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/states/Node2ObjectsMap.java @@ -67,6 +67,7 @@ public class Node2ObjectsMap<T> { * @param datanodeID -- Datanode UUID * @param containerIDs - List of ContainerIDs. */ + @VisibleForTesting public void insertNewDatanode(UUID datanodeID, Set<T> containerIDs) throws SCMException { Preconditions.checkNotNull(containerIDs); @@ -83,7 +84,8 @@ public class Node2ObjectsMap<T> { * * @param datanodeID - Datanode ID. */ - void removeDatanode(UUID datanodeID) { + @VisibleForTesting + public void removeDatanode(UUID datanodeID) { Preconditions.checkNotNull(datanodeID); dn2ObjectMap.computeIfPresent(datanodeID, (k, v) -> null); } diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/states/Node2PipelineMap.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/states/Node2PipelineMap.java index f8633f9..714188d 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/states/Node2PipelineMap.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/states/Node2PipelineMap.java @@ -42,7 +42,7 @@ public class Node2PipelineMap extends Node2ObjectsMap<PipelineID> { } /** - * Returns null if there no pipelines associated with this datanode ID. + * Returns null if there are no pipelines associated with this datanode ID. * * @param datanode - UUID * @return Set of pipelines or Null. @@ -52,6 +52,16 @@ public class Node2PipelineMap extends Node2ObjectsMap<PipelineID> { } /** + * Return 0 if there are no pipelines associated with this datanode ID. + * @param datanode - UUID + * @return Number of pipelines or 0. + */ + public int getPipelinesCount(UUID datanode) { + Set<PipelineID> pipelines = getObjects(datanode); + return pipelines == null ? 0 : pipelines.size(); + } + + /** * Adds a pipeline entry to a given dataNode in the map. * * @param pipeline Pipeline to be added diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelinePlacementPolicy.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelinePlacementPolicy.java new file mode 100644 index 0000000..cb9954d --- /dev/null +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelinePlacementPolicy.java @@ -0,0 +1,338 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hdds.scm.pipeline; + +import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Preconditions; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hdds.protocol.DatanodeDetails; +import org.apache.hadoop.hdds.protocol.proto.HddsProtos; +import org.apache.hadoop.hdds.scm.ScmConfigKeys; +import org.apache.hadoop.hdds.scm.container.placement.algorithms.SCMCommonPolicy; +import org.apache.hadoop.hdds.scm.container.placement.metrics.SCMNodeMetric; +import org.apache.hadoop.hdds.scm.exceptions.SCMException; +import org.apache.hadoop.hdds.scm.net.NetworkTopology; +import org.apache.hadoop.hdds.scm.net.Node; +import org.apache.hadoop.hdds.scm.node.NodeManager; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; +import java.util.stream.Collectors; + +/** + * Pipeline placement policy that choose datanodes based on load balancing + * and network topology to supply pipeline creation. + * <p> + * 1. get a list of healthy nodes + * 2. filter out nodes that are not too heavily engaged in other pipelines + * 3. Choose an anchor node among the viable nodes. + * 4. Choose other nodes around the anchor node based on network topology + */ +public final class PipelinePlacementPolicy extends SCMCommonPolicy { + @VisibleForTesting + static final Logger LOG = + LoggerFactory.getLogger(PipelinePlacementPolicy.class); + private final NodeManager nodeManager; + private final Configuration conf; + private final int heavyNodeCriteria; + + /** + * Constructs a pipeline placement with considering network topology, + * load balancing and rack awareness. + * + * @param nodeManager Node Manager + * @param conf Configuration + */ + public PipelinePlacementPolicy( + final NodeManager nodeManager, final Configuration conf) { + super(nodeManager, conf); + this.nodeManager = nodeManager; + this.conf = conf; + heavyNodeCriteria = conf.getInt( + ScmConfigKeys.OZONE_DATANODE_MAX_PIPELINE_ENGAGEMENT, + ScmConfigKeys.OZONE_DATANODE_MAX_PIPELINE_ENGAGEMENT_DEFAULT); + } + + /** + * Returns true if this node meets the criteria. + * + * @param datanodeDetails DatanodeDetails + * @return true if we have enough space. + */ + @VisibleForTesting + boolean meetCriteria(DatanodeDetails datanodeDetails, long heavyNodeLimit) { + return (nodeManager.getPipelinesCount(datanodeDetails) <= heavyNodeLimit); + } + + /** + * Filter out viable nodes based on + * 1. nodes that are healthy + * 2. nodes that are not too heavily engaged in other pipelines + * + * @param excludedNodes - excluded nodes + * @param nodesRequired - number of datanodes required. + * @return a list of viable nodes + * @throws SCMException when viable nodes are not enough in numbers + */ + List<DatanodeDetails> filterViableNodes( + List<DatanodeDetails> excludedNodes, int nodesRequired) + throws SCMException { + // get nodes in HEALTHY state + List<DatanodeDetails> healthyNodes = + nodeManager.getNodes(HddsProtos.NodeState.HEALTHY); + if (excludedNodes != null) { + healthyNodes.removeAll(excludedNodes); + } + String msg; + if (healthyNodes.size() == 0) { + msg = "No healthy node found to allocate pipeline."; + LOG.error(msg); + throw new SCMException(msg, SCMException.ResultCodes + .FAILED_TO_FIND_HEALTHY_NODES); + } + + if (healthyNodes.size() < nodesRequired) { + msg = String.format("Not enough healthy nodes to allocate pipeline. %d " + + " datanodes required. Found %d", + nodesRequired, healthyNodes.size()); + LOG.error(msg); + throw new SCMException(msg, + SCMException.ResultCodes.FAILED_TO_FIND_SUITABLE_NODE); + } + + // filter nodes that meet the size and pipeline engagement criteria. + // Pipeline placement doesn't take node space left into account. + List<DatanodeDetails> healthyList = healthyNodes.stream().filter(d -> + meetCriteria(d, heavyNodeCriteria)).collect(Collectors.toList()); + + if (healthyList.size() < nodesRequired) { + msg = String.format("Unable to find enough nodes that meet " + + "the criteria that cannot engage in more than %d pipelines." + + " Nodes required: %d Found: %d", + heavyNodeCriteria, nodesRequired, healthyList.size()); + LOG.error(msg); + throw new SCMException(msg, + SCMException.ResultCodes.FAILED_TO_FIND_SUITABLE_NODE); + } + return healthyList; + } + + /** + * Pipeline placement choose datanodes to join the pipeline. + * + * @param excludedNodes - excluded nodes + * @param favoredNodes - list of nodes preferred. + * @param nodesRequired - number of datanodes required. + * @param sizeRequired - size required for the container or block. + * @return a list of chosen datanodeDetails + * @throws SCMException when chosen nodes are not enough in numbers + */ + @Override + public List<DatanodeDetails> chooseDatanodes( + List<DatanodeDetails> excludedNodes, List<DatanodeDetails> favoredNodes, + int nodesRequired, final long sizeRequired) throws SCMException { + // get a list of viable nodes based on criteria + List<DatanodeDetails> healthyNodes = + filterViableNodes(excludedNodes, nodesRequired); + + List<DatanodeDetails> results = new ArrayList<>(); + + // Randomly picks nodes when all nodes are equal. + // This happens when network topology is absent or + // all nodes are on the same rack. + if (checkAllNodesAreEqual(nodeManager.getClusterNetworkTopologyMap())) { + LOG.info("All nodes are considered equal. Now randomly pick nodes. " + + "Required nodes: {}", nodesRequired); + results = super.getResultSet(nodesRequired, healthyNodes); + if (results.size() < nodesRequired) { + LOG.error("Unable to find the required number of healthy nodes that " + + "meet the criteria. Required nodes: {}, Found nodes: {}", + nodesRequired, results.size()); + throw new SCMException("Unable to find required number of nodes.", + SCMException.ResultCodes.FAILED_TO_FIND_SUITABLE_NODE); + } + return results; + } + + // Since nodes are widely distributed, the results should be selected + // base on distance in topology, rack awareness and load balancing. + List<DatanodeDetails> exclude = new ArrayList<>(); + exclude.addAll(excludedNodes); + // First choose an anchor nodes randomly + DatanodeDetails anchor = chooseNode(healthyNodes); + if (anchor == null) { + LOG.error("Unable to find the first healthy nodes that " + + "meet the criteria. Required nodes: {}, Found nodes: {}", + nodesRequired, results.size()); + throw new SCMException("Unable to find required number of nodes.", + SCMException.ResultCodes.FAILED_TO_FIND_SUITABLE_NODE); + } + + results.add(anchor); + exclude.add(anchor); + nodesRequired--; + + // Choose the second node on different racks from anchor. + DatanodeDetails nodeOnDifferentRack = chooseNodeBasedOnRackAwareness( + healthyNodes, excludedNodes, + nodeManager.getClusterNetworkTopologyMap(), anchor); + if (nodeOnDifferentRack == null) { + LOG.error("Unable to find nodes on different racks that " + + "meet the criteria. Required nodes: {}, Found nodes: {}", + nodesRequired, results.size()); + throw new SCMException("Unable to find required number of nodes.", + SCMException.ResultCodes.FAILED_TO_FIND_SUITABLE_NODE); + } + + results.add(nodeOnDifferentRack); + exclude.add(nodeOnDifferentRack); + nodesRequired--; + + // Then choose nodes close to anchor based on network topology + for (int x = 0; x < nodesRequired; x++) { + // invoke the choose function defined in the derived classes. + DatanodeDetails pick = chooseNodeFromNetworkTopology( + nodeManager.getClusterNetworkTopologyMap(), anchor, exclude); + if (pick != null) { + results.add(pick); + // exclude the picked node for next time + exclude.add(pick); + } + } + + if (results.size() < nodesRequired) { + LOG.error("Unable to find the required number of healthy nodes that " + + "meet the criteria. Required nodes: {}, Found nodes: {}", + nodesRequired, results.size()); + throw new SCMException("Unable to find required number of nodes.", + SCMException.ResultCodes.FAILED_TO_FIND_SUITABLE_NODE); + } + return results; + } + + /** + * Find a node from the healthy list and return it after removing it from the + * list that we are operating on. + * + * @param healthyNodes - Set of healthy nodes we can choose from. + * @return chosen datanodDetails + */ + @Override + public DatanodeDetails chooseNode( + List<DatanodeDetails> healthyNodes) { + int firstNodeNdx = getRand().nextInt(healthyNodes.size()); + int secondNodeNdx = getRand().nextInt(healthyNodes.size()); + + DatanodeDetails datanodeDetails; + // There is a possibility that both numbers will be same. + // if that is so, we just return the node. + if (firstNodeNdx == secondNodeNdx) { + datanodeDetails = healthyNodes.get(firstNodeNdx); + } else { + DatanodeDetails firstNodeDetails = healthyNodes.get(firstNodeNdx); + DatanodeDetails secondNodeDetails = healthyNodes.get(secondNodeNdx); + SCMNodeMetric firstNodeMetric = + nodeManager.getNodeStat(firstNodeDetails); + SCMNodeMetric secondNodeMetric = + nodeManager.getNodeStat(secondNodeDetails); + datanodeDetails = firstNodeMetric.isGreater(secondNodeMetric.get()) + ? firstNodeDetails : secondNodeDetails; + } + // the pick is decided and it should be removed from candidates. + healthyNodes.remove(datanodeDetails); + return datanodeDetails; + } + + /** + * Choose node on different racks as anchor is on based on rack awareness. + * If a node on different racks cannot be found, then return a random node. + * @param healthyNodes healthy nodes + * @param excludedNodes excluded nodes + * @param networkTopology network topology + * @param anchor anchor node + * @return a node on different rack + */ + @VisibleForTesting + protected DatanodeDetails chooseNodeBasedOnRackAwareness( + List<DatanodeDetails> healthyNodes, List<DatanodeDetails> excludedNodes, + NetworkTopology networkTopology, DatanodeDetails anchor) { + Preconditions.checkArgument(networkTopology != null); + if (checkAllNodesAreEqual(networkTopology)) { + return null; + } + + for (DatanodeDetails node : healthyNodes) { + if (excludedNodes.contains(node) + || networkTopology.isSameParent(anchor, node)) { + continue; + } else { + // the pick is decided and it should be removed from candidates. + healthyNodes.remove(node); + return node; + } + } + return null; + } + + /** + * Check if all nodes are equal in topology. + * They are equal when network topology is absent or there are on + * the same rack. + * @param topology network topology + * @return true when all nodes are equal + */ + private boolean checkAllNodesAreEqual(NetworkTopology topology) { + if (topology == null) { + return true; + } + return (topology.getNumOfNodes(topology.getMaxLevel() - 1) == 1); + } + + /** + * Choose node based on network topology. + * @param networkTopology network topology + * @param anchor anchor datanode to start with + * @param excludedNodes excluded datanodes + * @return chosen datanode + */ + @VisibleForTesting + protected DatanodeDetails chooseNodeFromNetworkTopology( + NetworkTopology networkTopology, DatanodeDetails anchor, + List<DatanodeDetails> excludedNodes) { + Preconditions.checkArgument(networkTopology != null); + + Collection<Node> excluded = new ArrayList<>(); + if (excludedNodes != null && excludedNodes.size() != 0) { + excluded.addAll(excludedNodes); + } + excluded.add(anchor); + + Node pick = networkTopology.chooseRandom( + anchor.getNetworkLocation(), excluded); + DatanodeDetails pickedNode = (DatanodeDetails) pick; + // exclude the picked node for next time + if (excludedNodes != null) { + excludedNodes.add(pickedNode); + } + return pickedNode; + } +} diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/MockNodeManager.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/MockNodeManager.java index 87cc177..613146d 100644 --- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/MockNodeManager.java +++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/MockNodeManager.java @@ -16,11 +16,13 @@ */ package org.apache.hadoop.hdds.scm.container; +import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hdds.protocol.proto .StorageContainerDatanodeProtocolProtos.PipelineReportsProto; import org.apache.hadoop.hdds.scm.TestUtils; import org.apache.hadoop.hdds.scm.net.NetConstants; import org.apache.hadoop.hdds.scm.net.NetworkTopology; +import org.apache.hadoop.hdds.scm.net.NetworkTopologyImpl; import org.apache.hadoop.hdds.scm.net.Node; import org.apache.hadoop.hdds.scm.pipeline.Pipeline; import org.apache.hadoop.hdds.scm.pipeline.PipelineID; @@ -86,7 +88,7 @@ public class MockNodeManager implements NodeManager { private final SCMNodeStat aggregateStat; private boolean safemode; private final Map<UUID, List<SCMCommand>> commandMap; - private final Node2PipelineMap node2PipelineMap; + private Node2PipelineMap node2PipelineMap; private final Node2ContainerMap node2ContainerMap; private NetworkTopology clusterMap; private ConcurrentMap<String, Set<String>> dnsToUuidMap; @@ -100,6 +102,7 @@ public class MockNodeManager implements NodeManager { this.node2ContainerMap = new Node2ContainerMap(); this.dnsToUuidMap = new ConcurrentHashMap<>(); aggregateStat = new SCMNodeStat(); + clusterMap = new NetworkTopologyImpl(new Configuration()); if (initializeFakeNodes) { for (int x = 0; x < nodeCount; x++) { DatanodeDetails dd = TestUtils.randomDatanodeDetails(); @@ -251,6 +254,16 @@ public class MockNodeManager implements NodeManager { } /** + * Get the count of pipelines a datanodes is associated with. + * @param datanodeDetails DatanodeDetails + * @return The number of pipelines + */ + @Override + public int getPipelinesCount(DatanodeDetails datanodeDetails) { + return node2PipelineMap.getPipelinesCount(datanodeDetails.getUuid()); + } + + /** * Add pipeline information in the NodeManager. * @param pipeline - Pipeline to be added */ @@ -260,6 +273,22 @@ public class MockNodeManager implements NodeManager { } /** + * Get the entire Node2PipelineMap. + * @return Node2PipelineMap + */ + public Node2PipelineMap getNode2PipelineMap() { + return node2PipelineMap; + } + + /** + * Set the Node2PipelineMap. + * @param node2PipelineMap Node2PipelineMap + */ + public void setNode2PipelineMap(Node2PipelineMap node2PipelineMap) { + this.node2PipelineMap = node2PipelineMap; + } + + /** * Remove a pipeline information from the NodeManager. * @param pipeline - Pipeline to be removed */ @@ -517,6 +546,11 @@ public class MockNodeManager implements NodeManager { return results; } + @Override + public NetworkTopology getClusterNetworkTopologyMap() { + return clusterMap; + } + public void setNetworkTopology(NetworkTopology topology) { this.clusterMap = topology; } diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestPipelinePlacementPolicy.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestPipelinePlacementPolicy.java new file mode 100644 index 0000000..2e0d0b1 --- /dev/null +++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestPipelinePlacementPolicy.java @@ -0,0 +1,197 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hdds.scm.pipeline; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hdds.conf.OzoneConfiguration; +import org.apache.hadoop.hdds.protocol.DatanodeDetails; +import org.apache.hadoop.hdds.protocol.proto.HddsProtos; +import org.apache.hadoop.hdds.scm.ScmConfigKeys; +import org.apache.hadoop.hdds.scm.container.MockNodeManager; +import org.apache.hadoop.hdds.scm.exceptions.SCMException; +import org.apache.hadoop.hdds.scm.net.*; +import org.apache.hadoop.hdds.scm.node.states.Node2PipelineMap; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +import java.util.*; +import java.util.stream.Collectors; + +/** + * Test for PipelinePlacementPolicy. + */ +public class TestPipelinePlacementPolicy { + private MockNodeManager nodeManager; + private PipelinePlacementPolicy placementPolicy; + private static final int PIPELINE_PLACEMENT_MAX_NODES_COUNT = 10; + + @Before + public void init() throws Exception { + nodeManager = new MockNodeManager(true, + PIPELINE_PLACEMENT_MAX_NODES_COUNT); + placementPolicy = + new PipelinePlacementPolicy(nodeManager, new OzoneConfiguration()); + } + + @Test + public void testChooseNodeBasedOnNetworkTopology() { + List<DatanodeDetails> healthyNodes = + nodeManager.getNodes(HddsProtos.NodeState.HEALTHY); + DatanodeDetails anchor = placementPolicy.chooseNode(healthyNodes); + // anchor should be removed from healthyNodes after being chosen. + Assert.assertFalse(healthyNodes.contains(anchor)); + + List<DatanodeDetails> excludedNodes = + new ArrayList<>(PIPELINE_PLACEMENT_MAX_NODES_COUNT); + DatanodeDetails nextNode = placementPolicy.chooseNodeFromNetworkTopology( + nodeManager.getClusterNetworkTopologyMap(), anchor, excludedNodes); + // excludedNodes should contain nextNode after being chosen. + Assert.assertTrue(excludedNodes.contains(nextNode)); + // nextNode should not be the same as anchor. + Assert.assertTrue(anchor.getUuid() != nextNode.getUuid()); + } + + @Test + public void testChooseNodeBasedOnRackAwareness() { + List<DatanodeDetails> healthyNodes = overWriteLocationInNodes( + nodeManager.getNodes(HddsProtos.NodeState.HEALTHY)); + DatanodeDetails anchor = placementPolicy.chooseNode(healthyNodes); + NetworkTopology topologyWithDifRacks = + createNetworkTopologyOnDifRacks(); + DatanodeDetails nextNode = placementPolicy.chooseNodeBasedOnRackAwareness( + healthyNodes, new ArrayList<>(PIPELINE_PLACEMENT_MAX_NODES_COUNT), + topologyWithDifRacks, anchor); + Assert.assertFalse(topologyWithDifRacks.isSameParent(anchor, nextNode)); + } + + private final static Node[] NODES = new NodeImpl[] { + new NodeImpl("h1", "/r1", NetConstants.NODE_COST_DEFAULT), + new NodeImpl("h2", "/r1", NetConstants.NODE_COST_DEFAULT), + new NodeImpl("h3", "/r1", NetConstants.NODE_COST_DEFAULT), + new NodeImpl("h4", "/r1", NetConstants.NODE_COST_DEFAULT), + new NodeImpl("h5", "/r2", NetConstants.NODE_COST_DEFAULT), + new NodeImpl("h6", "/r2", NetConstants.NODE_COST_DEFAULT), + new NodeImpl("h7", "/r2", NetConstants.NODE_COST_DEFAULT), + new NodeImpl("h8", "/r2", NetConstants.NODE_COST_DEFAULT), + }; + + + private NetworkTopology createNetworkTopologyOnDifRacks() { + NetworkTopology topology = new NetworkTopologyImpl(new Configuration()); + for (Node n : NODES) { + topology.add(n); + } + return topology; + } + + private List<DatanodeDetails> overWriteLocationInNodes( + List<DatanodeDetails> datanodes) { + List<DatanodeDetails> results = new ArrayList<>(datanodes.size()); + for (int i = 0; i < datanodes.size(); i++) { + DatanodeDetails datanode = datanodes.get(i); + DatanodeDetails result = DatanodeDetails.newBuilder() + .setUuid(datanode.getUuidString()) + .setHostName(datanode.getHostName()) + .setIpAddress(datanode.getIpAddress()) + .addPort(datanode.getPort(DatanodeDetails.Port.Name.STANDALONE)) + .addPort(datanode.getPort(DatanodeDetails.Port.Name.RATIS)) + .addPort(datanode.getPort(DatanodeDetails.Port.Name.REST)) + .setNetworkLocation(NODES[i].getNetworkLocation()).build(); + results.add(result); + } + return results; + } + + @Test + public void testHeavyNodeShouldBeExcluded() throws SCMException{ + List<DatanodeDetails> healthyNodes = + nodeManager.getNodes(HddsProtos.NodeState.HEALTHY); + int nodesRequired = healthyNodes.size()/2; + // only minority of healthy NODES are heavily engaged in pipelines. + int minorityHeavy = healthyNodes.size()/2 - 1; + List<DatanodeDetails> pickedNodes1 = placementPolicy.chooseDatanodes( + new ArrayList<>(PIPELINE_PLACEMENT_MAX_NODES_COUNT), + new ArrayList<>(PIPELINE_PLACEMENT_MAX_NODES_COUNT), + nodesRequired, 0); + // modify node to pipeline mapping. + insertHeavyNodesIntoNodeManager(healthyNodes, minorityHeavy); + // NODES should be sufficient. + Assert.assertEquals(nodesRequired, pickedNodes1.size()); + // make sure pipeline placement policy won't select duplicated NODES. + Assert.assertTrue(checkDuplicateNodesUUID(pickedNodes1)); + + // majority of healthy NODES are heavily engaged in pipelines. + int majorityHeavy = healthyNodes.size()/2 + 2; + insertHeavyNodesIntoNodeManager(healthyNodes, majorityHeavy); + boolean thrown = false; + List<DatanodeDetails> pickedNodes2 = null; + try { + pickedNodes2 = placementPolicy.chooseDatanodes( + new ArrayList<>(PIPELINE_PLACEMENT_MAX_NODES_COUNT), + new ArrayList<>(PIPELINE_PLACEMENT_MAX_NODES_COUNT), + nodesRequired, 0); + } catch (SCMException e) { + Assert.assertFalse(thrown); + thrown = true; + } + // NODES should NOT be sufficient and exception should be thrown. + Assert.assertNull(pickedNodes2); + Assert.assertTrue(thrown); + } + + private boolean checkDuplicateNodesUUID(List<DatanodeDetails> nodes) { + HashSet<UUID> uuids = nodes.stream(). + map(DatanodeDetails::getUuid). + collect(Collectors.toCollection(HashSet::new)); + return uuids.size() == nodes.size(); + } + + private Set<PipelineID> mockPipelineIDs(int count) { + Set<PipelineID> pipelineIDs = new HashSet<>(count); + for (int i = 0; i < count; i++) { + pipelineIDs.add(PipelineID.randomId()); + } + return pipelineIDs; + } + + private void insertHeavyNodesIntoNodeManager( + List<DatanodeDetails> nodes, int heavyNodeCount) throws SCMException{ + if (nodes == null) { + throw new SCMException("", + SCMException.ResultCodes.FAILED_TO_FIND_SUITABLE_NODE); + } + + int considerHeavyCount = + ScmConfigKeys.OZONE_DATANODE_MAX_PIPELINE_ENGAGEMENT_DEFAULT + 1; + + Node2PipelineMap mockMap = new Node2PipelineMap(); + for (DatanodeDetails node : nodes) { + // mock heavy node + if (heavyNodeCount > 0) { + mockMap.insertNewDatanode( + node.getUuid(), mockPipelineIDs(considerHeavyCount)); + heavyNodeCount--; + } else { + mockMap.insertNewDatanode(node.getUuid(), mockPipelineIDs(1)); + } + } + nodeManager.setNode2PipelineMap(mockMap); + } +} diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/ozone/container/testutils/ReplicationNodeManagerMock.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/ozone/container/testutils/ReplicationNodeManagerMock.java index 0ecff3f..7e8ec52 100644 --- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/ozone/container/testutils/ReplicationNodeManagerMock.java +++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/ozone/container/testutils/ReplicationNodeManagerMock.java @@ -20,6 +20,7 @@ import com.google.common.base.Preconditions; import org.apache.hadoop.hdds.protocol.proto .StorageContainerDatanodeProtocolProtos.PipelineReportsProto; import org.apache.hadoop.hdds.scm.container.ContainerID; +import org.apache.hadoop.hdds.scm.net.NetworkTopology; import org.apache.hadoop.hdds.scm.pipeline.Pipeline; import org.apache.hadoop.hdds.scm.pipeline.PipelineID; import org.apache.hadoop.hdds.scm.container.placement.metrics.SCMNodeMetric; @@ -166,6 +167,16 @@ public class ReplicationNodeManagerMock implements NodeManager { } /** + * Get the count of pipelines a datanodes is associated with. + * @param dnId DatanodeDetails + * @return The number of pipelines + */ + @Override + public int getPipelinesCount(DatanodeDetails dnId) { + throw new UnsupportedOperationException("Not yet implemented"); + } + + /** * Add pipeline information in the NodeManager. * @param pipeline - Pipeline to be added */ @@ -327,4 +338,9 @@ public class ReplicationNodeManagerMock implements NodeManager { public List<DatanodeDetails> getNodesByAddress(String address) { return new LinkedList<>(); } + + @Override + public NetworkTopology getClusterNetworkTopologyMap() { + return null; + } } --------------------------------------------------------------------- To unsubscribe, e-mail: ozone-commits-unsubscr...@hadoop.apache.org For additional commands, e-mail: ozone-commits-h...@hadoop.apache.org