This is an automated email from the ASF dual-hosted git repository. sodonnell pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/hadoop-ozone.git
The following commit(s) were added to refs/heads/master by this push: new fac26e1 HDDS-3139. Pipeline placement should max out pipeline usage (#668) fac26e1 is described below commit fac26e134b6ec6c0cc85e1dc867d66aea82c0b8a Author: Li Cheng <bloodhell2...@gmail.com> AuthorDate: Tue Apr 21 23:15:32 2020 +0800 HDDS-3139. Pipeline placement should max out pipeline usage (#668) --- .../hdds/scm/pipeline/PipelinePlacementPolicy.java | 134 +++++++++------------ .../scm/pipeline/TestPipelinePlacementPolicy.java | 76 ++++++++++-- 2 files changed, 124 insertions(+), 86 deletions(-) diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelinePlacementPolicy.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelinePlacementPolicy.java index b6a6858..68fe65d 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelinePlacementPolicy.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelinePlacementPolicy.java @@ -25,16 +25,14 @@ import org.apache.hadoop.hdds.protocol.DatanodeDetails; import org.apache.hadoop.hdds.protocol.proto.HddsProtos; import org.apache.hadoop.hdds.scm.ScmConfigKeys; import org.apache.hadoop.hdds.scm.SCMCommonPlacementPolicy; -import org.apache.hadoop.hdds.scm.container.placement.metrics.SCMNodeMetric; import org.apache.hadoop.hdds.scm.exceptions.SCMException; import org.apache.hadoop.hdds.scm.net.NetworkTopology; -import org.apache.hadoop.hdds.scm.net.Node; import org.apache.hadoop.hdds.scm.node.NodeManager; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.util.ArrayList; -import java.util.Collection; +import java.util.Comparator; import java.util.List; import java.util.Set; import java.util.stream.Collectors; @@ -76,19 +74,8 @@ public final class PipelinePlacementPolicy extends SCMCommonPlacementPolicy { ScmConfigKeys.OZONE_DATANODE_PIPELINE_LIMIT_DEFAULT); } - /** - * Returns true if this node meets the criteria. - * - * @param datanodeDetails DatanodeDetails - * @param nodesRequired nodes required count - * @return true if we have enough space. - */ - @VisibleForTesting - boolean meetCriteria(DatanodeDetails datanodeDetails, int nodesRequired) { - if (heavyNodeCriteria == 0) { - // no limit applied. - return true; - } + int currentPipelineCount(DatanodeDetails datanodeDetails, int nodesRequired) { + // Datanodes from pipeline in some states can also be considered available // for pipeline allocation. Thus the number of these pipeline shall be // deducted from total heaviness calculation. @@ -110,21 +97,16 @@ public final class PipelinePlacementPolicy extends SCMCommonPlacementPolicy { pipelineNumDeductable++; } } - boolean meet = (nodeManager.getPipelinesCount(datanodeDetails) - - pipelineNumDeductable) < heavyNodeCriteria; - if (!meet && LOG.isDebugEnabled()) { - LOG.debug("Pipeline Placement: can't place more pipeline on heavy " + - "datanodeļ¼ " + datanodeDetails.getUuid().toString() + - " Heaviness: " + nodeManager.getPipelinesCount(datanodeDetails) + - " limit: " + heavyNodeCriteria); - } - return meet; + return pipelines.size() - pipelineNumDeductable; } + + /** * Filter out viable nodes based on * 1. nodes that are healthy * 2. nodes that are not too heavily engaged in other pipelines + * The results are sorted based on pipeline count of each node. * * @param excludedNodes - excluded nodes * @param nodesRequired - number of datanodes required. @@ -154,8 +136,15 @@ public final class PipelinePlacementPolicy extends SCMCommonPlacementPolicy { // filter nodes that meet the size and pipeline engagement criteria. // Pipeline placement doesn't take node space left into account. + // Sort the DNs by pipeline load. + // TODO check if sorting could cause performance issue: HDDS-3466. List<DatanodeDetails> healthyList = healthyNodes.stream() - .filter(d -> meetCriteria(d, nodesRequired)) + .map(d -> + new DnWithPipelines(d, currentPipelineCount(d, nodesRequired))) + .filter(d -> + ((d.getPipelines() < heavyNodeCriteria) || heavyNodeCriteria == 0)) + .sorted(Comparator.comparingInt(DnWithPipelines::getPipelines)) + .map(d -> d.getDn()) .collect(Collectors.toList()); if (healthyList.size() < nodesRequired) { @@ -253,7 +242,7 @@ public final class PipelinePlacementPolicy extends SCMCommonPlacementPolicy { // Since nodes are widely distributed, the results should be selected // base on distance in topology, rack awareness and load balancing. List<DatanodeDetails> exclude = new ArrayList<>(); - // First choose an anchor nodes randomly + // First choose an anchor node. DatanodeDetails anchor = chooseNode(healthyNodes); if (anchor != null) { results.add(anchor); @@ -291,8 +280,9 @@ public final class PipelinePlacementPolicy extends SCMCommonPlacementPolicy { // Pick remaining nodes based on the existence of rack awareness. DatanodeDetails pick = null; if (rackAwareness) { - pick = chooseNodeFromNetworkTopology( - nodeManager.getClusterNetworkTopologyMap(), anchor, exclude); + pick = chooseNodeBasedOnSameRack( + healthyNodes, exclude, + nodeManager.getClusterNetworkTopologyMap(), anchor); } // fall back protection if (pick == null) { @@ -333,24 +323,7 @@ public final class PipelinePlacementPolicy extends SCMCommonPlacementPolicy { if (healthyNodes == null || healthyNodes.isEmpty()) { return null; } - int firstNodeNdx = getRand().nextInt(healthyNodes.size()); - int secondNodeNdx = getRand().nextInt(healthyNodes.size()); - - DatanodeDetails datanodeDetails; - // There is a possibility that both numbers will be same. - // if that is so, we just return the node. - if (firstNodeNdx == secondNodeNdx) { - datanodeDetails = healthyNodes.get(firstNodeNdx); - } else { - DatanodeDetails firstNodeDetails = healthyNodes.get(firstNodeNdx); - DatanodeDetails secondNodeDetails = healthyNodes.get(secondNodeNdx); - SCMNodeMetric firstNodeMetric = - nodeManager.getNodeStat(firstNodeDetails); - SCMNodeMetric secondNodeMetric = - nodeManager.getNodeStat(secondNodeDetails); - datanodeDetails = firstNodeMetric.isGreater(secondNodeMetric.get()) - ? firstNodeDetails : secondNodeDetails; - } + DatanodeDetails datanodeDetails = healthyNodes.get(0); healthyNodes.remove(datanodeDetails); return datanodeDetails; } @@ -373,13 +346,31 @@ public final class PipelinePlacementPolicy extends SCMCommonPlacementPolicy { return null; } - for (DatanodeDetails node : healthyNodes) { - if (excludedNodes.contains(node) || - anchor.getNetworkLocation().equals(node.getNetworkLocation())) { - continue; - } else { - return node; - } + List<DatanodeDetails> nodesOnOtherRack = healthyNodes.stream().filter( + p -> !excludedNodes.contains(p) + && !anchor.getNetworkLocation().equals(p.getNetworkLocation())) + .collect(Collectors.toList()); + if (!nodesOnOtherRack.isEmpty()) { + return nodesOnOtherRack.get(0); + } + return null; + } + + @VisibleForTesting + protected DatanodeDetails chooseNodeBasedOnSameRack( + List<DatanodeDetails> healthyNodes, List<DatanodeDetails> excludedNodes, + NetworkTopology networkTopology, DatanodeDetails anchor) { + Preconditions.checkArgument(networkTopology != null); + if (checkAllNodesAreEqual(networkTopology)) { + return null; + } + + List<DatanodeDetails> nodesOnSameRack = healthyNodes.stream().filter( + p -> !excludedNodes.contains(p) + && anchor.getNetworkLocation().equals(p.getNetworkLocation())) + .collect(Collectors.toList()); + if (!nodesOnSameRack.isEmpty()) { + return nodesOnSameRack.get(0); } return null; } @@ -398,31 +389,22 @@ public final class PipelinePlacementPolicy extends SCMCommonPlacementPolicy { return (topology.getNumOfNodes(topology.getMaxLevel() - 1) == 1); } - /** - * Choose node based on network topology. - * @param networkTopology network topology - * @param anchor anchor datanode to start with - * @param excludedNodes excluded datanodes - * @return chosen datanode - */ - @VisibleForTesting - protected DatanodeDetails chooseNodeFromNetworkTopology( - NetworkTopology networkTopology, DatanodeDetails anchor, - List<DatanodeDetails> excludedNodes) { - Preconditions.checkArgument(networkTopology != null); + private static class DnWithPipelines { + private DatanodeDetails dn; + private int pipelines; + + DnWithPipelines(DatanodeDetails dn, int pipelines) { + this.dn = dn; + this.pipelines = pipelines; + } - Collection<Node> excluded = new ArrayList<>(); - if (excludedNodes != null && excludedNodes.size() != 0) { - excluded.addAll(excludedNodes); + public int getPipelines() { + return pipelines; } - Node pick = networkTopology.chooseRandom( - anchor.getNetworkLocation(), excluded); - DatanodeDetails pickedNode = (DatanodeDetails) pick; - if (pickedNode == null) { - LOG.debug("Pick node is null, excluded nodes {}, anchor {}.", - excluded, anchor); + public DatanodeDetails getDn() { + return dn; } - return pickedNode; } + } diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestPipelinePlacementPolicy.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestPipelinePlacementPolicy.java index b8b8622..426a643 100644 --- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestPipelinePlacementPolicy.java +++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestPipelinePlacementPolicy.java @@ -41,27 +41,37 @@ import org.apache.hadoop.hdds.scm.net.NodeSchema; import org.apache.hadoop.hdds.scm.net.NodeSchemaManager; import org.apache.hadoop.hdds.scm.node.states.Node2PipelineMap; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; + import static org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_DATANODE_PIPELINE_LIMIT; import static org.apache.hadoop.hdds.scm.net.NetConstants.LEAF_SCHEMA; import static org.apache.hadoop.hdds.scm.net.NetConstants.RACK_SCHEMA; import static org.apache.hadoop.hdds.scm.net.NetConstants.ROOT_SCHEMA; -import org.junit.Assert; -import org.junit.Before; -import org.junit.Test; /** * Test for PipelinePlacementPolicy. */ public class TestPipelinePlacementPolicy { private MockNodeManager nodeManager; + private PipelineStateManager stateManager; private OzoneConfiguration conf; private PipelinePlacementPolicy placementPolicy; private NetworkTopologyImpl cluster; private static final int PIPELINE_PLACEMENT_MAX_NODES_COUNT = 10; + private static final int PIPELINE_LOAD_LIMIT = 5; private List<DatanodeDetails> nodesWithOutRackAwareness = new ArrayList<>(); private List<DatanodeDetails> nodesWithRackAwareness = new ArrayList<>(); + static final Logger LOG = + LoggerFactory.getLogger(TestPipelinePlacementPolicy.class); + @Before public void init() throws Exception { cluster = initTopology(); @@ -69,9 +79,10 @@ public class TestPipelinePlacementPolicy { nodeManager = new MockNodeManager(cluster, getNodesWithRackAwareness(), false, PIPELINE_PLACEMENT_MAX_NODES_COUNT); conf = new OzoneConfiguration(); - conf.setInt(OZONE_DATANODE_PIPELINE_LIMIT, 5); + conf.setInt(OZONE_DATANODE_PIPELINE_LIMIT, PIPELINE_LOAD_LIMIT); + stateManager = new PipelineStateManager(); placementPolicy = new PipelinePlacementPolicy( - nodeManager, new PipelineStateManager(), conf); + nodeManager, stateManager, conf); } private NetworkTopologyImpl initTopology() { @@ -85,11 +96,14 @@ public class TestPipelinePlacementPolicy { private List<DatanodeDetails> getNodesWithRackAwareness() { List<DatanodeDetails> datanodes = new ArrayList<>(); - for (Node node : NODES) { + int iter = 0; + int delimiter = NODES.length; + while (iter < PIPELINE_PLACEMENT_MAX_NODES_COUNT) { DatanodeDetails datanode = overwriteLocationInNode( - getNodesWithoutRackAwareness(), node); + getNodesWithoutRackAwareness(), NODES[iter % delimiter]); nodesWithRackAwareness.add(datanode); datanodes.add(datanode); + iter++; } return datanodes; } @@ -101,7 +115,7 @@ public class TestPipelinePlacementPolicy { } @Test - public void testChooseNodeBasedOnNetworkTopology() throws SCMException { + public void testChooseNodeBasedOnNetworkTopology() { DatanodeDetails anchor = placementPolicy.chooseNode(nodesWithRackAwareness); // anchor should be removed from healthyNodes after being chosen. Assert.assertFalse(nodesWithRackAwareness.contains(anchor)); @@ -109,8 +123,11 @@ public class TestPipelinePlacementPolicy { List<DatanodeDetails> excludedNodes = new ArrayList<>(PIPELINE_PLACEMENT_MAX_NODES_COUNT); excludedNodes.add(anchor); - DatanodeDetails nextNode = placementPolicy.chooseNodeFromNetworkTopology( - nodeManager.getClusterNetworkTopologyMap(), anchor, excludedNodes); + DatanodeDetails nextNode = placementPolicy.chooseNodeBasedOnSameRack( + nodesWithRackAwareness, excludedNodes, + nodeManager.getClusterNetworkTopologyMap(), anchor); + //DatanodeDetails nextNode = placementPolicy.chooseNodeFromNetworkTopology( + // nodeManager.getClusterNetworkTopologyMap(), anchor, excludedNodes); Assert.assertFalse(excludedNodes.contains(nextNode)); // next node should not be the same as anchor. Assert.assertTrue(anchor.getUuid() != nextNode.getUuid()); @@ -149,6 +166,45 @@ public class TestPipelinePlacementPolicy { } @Test + public void testPickLowestLoadAnchor() throws IOException{ + List<DatanodeDetails> healthyNodes = nodeManager + .getNodes(HddsProtos.NodeState.HEALTHY); + + int maxPipelineCount = PIPELINE_LOAD_LIMIT * healthyNodes.size() + / HddsProtos.ReplicationFactor.THREE.getNumber(); + for (int i = 0; i < maxPipelineCount; i++) { + try { + List<DatanodeDetails> nodes = placementPolicy.chooseDatanodes(null, + null, HddsProtos.ReplicationFactor.THREE.getNumber(), 0); + + Pipeline pipeline = Pipeline.newBuilder() + .setId(PipelineID.randomId()) + .setState(Pipeline.PipelineState.ALLOCATED) + .setType(HddsProtos.ReplicationType.RATIS) + .setFactor(HddsProtos.ReplicationFactor.THREE) + .setNodes(nodes) + .build(); + nodeManager.addPipeline(pipeline); + stateManager.addPipeline(pipeline); + } catch (SCMException e) { + break; + } + } + + // Every node should be evenly used. + int averageLoadOnNode = maxPipelineCount * + HddsProtos.ReplicationFactor.THREE.getNumber() / healthyNodes.size(); + for (DatanodeDetails node : healthyNodes) { + Assert.assertTrue(nodeManager.getPipelinesCount(node) + >= averageLoadOnNode); + } + + // Should max out pipeline usage. + Assert.assertEquals(maxPipelineCount, + stateManager.getPipelines(HddsProtos.ReplicationType.RATIS).size()); + } + + @Test public void testChooseNodeBasedOnRackAwareness() { List<DatanodeDetails> healthyNodes = overWriteLocationInNodes( nodeManager.getNodes(HddsProtos.NodeState.HEALTHY)); --------------------------------------------------------------------- To unsubscribe, e-mail: ozone-commits-unsubscr...@hadoop.apache.org For additional commands, e-mail: ozone-commits-h...@hadoop.apache.org