[ https://issues.apache.org/jira/browse/HDDS-1577?focusedWorklogId=303343&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-303343 ]
ASF GitHub Bot logged work on HDDS-1577: ---------------------------------------- Author: ASF GitHub Bot Created on: 29/Aug/19 03:52 Start Date: 29/Aug/19 03:52 Worklog Time Spent: 10m Work Description: xiaoyuyao commented on pull request #1366: HDDS-1577. Add default pipeline placement policy implementation. URL: https://github.com/apache/hadoop/pull/1366#discussion_r318876175 ########## File path: hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelinePlacementPolicy.java ########## @@ -0,0 +1,237 @@ +package org.apache.hadoop.hdds.scm.pipeline; + +import com.google.common.annotations.VisibleForTesting; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hdds.protocol.DatanodeDetails; +import org.apache.hadoop.hdds.protocol.proto.HddsProtos; +import org.apache.hadoop.hdds.scm.ScmConfigKeys; +import org.apache.hadoop.hdds.scm.container.placement.algorithms.SCMCommonPolicy; +import org.apache.hadoop.hdds.scm.container.placement.metrics.SCMNodeMetric; +import org.apache.hadoop.hdds.scm.exceptions.SCMException; +import org.apache.hadoop.hdds.scm.net.NetworkTopology; +import org.apache.hadoop.hdds.scm.net.Node; +import org.apache.hadoop.hdds.scm.node.NodeManager; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; +import java.util.stream.Collectors; + +/** + * Pipeline placement policy that choose datanodes based on load balancing and network topology + * to supply pipeline creation. + * <p> + * 1. get a list of healthy nodes + * 2. filter out viable nodes that either don't have enough size left + * or are too heavily engaged in other pipelines + * 3. Choose an anchor node among the viable nodes which follows the algorithm + * described @SCMContainerPlacementCapacity + * 4. Choose other nodes around the anchor node based on network topology + */ +public final class PipelinePlacementPolicy extends SCMCommonPolicy { + @VisibleForTesting + static final Logger LOG = + LoggerFactory.getLogger(PipelinePlacementPolicy.class); + private final NodeManager nodeManager; + private final Configuration conf; + private final int heavy_node_criteria; + + /** + * Constructs a Container Placement with considering only capacity. + * That is this policy tries to place containers based on node weight. + * + * @param nodeManager Node Manager + * @param conf Configuration + */ + public PipelinePlacementPolicy(final NodeManager nodeManager, + final Configuration conf) { + super(nodeManager, conf); + this.nodeManager = nodeManager; + this.conf = conf; + heavy_node_criteria = conf.getInt(ScmConfigKeys.OZONE_SCM_DATANODE_MAX_PIPELINE_ENGAGEMENT, + ScmConfigKeys.OZONE_SCM_DATANODE_MAX_PIPELINE_ENGAGEMENT_DEFAULT); + } + + /** + * Returns true if this node meets the criteria. + * + * @param datanodeDetails DatanodeDetails + * @return true if we have enough space. + */ + boolean meetCriteria(DatanodeDetails datanodeDetails, + long sizeRequired) { + SCMNodeMetric nodeMetric = nodeManager.getNodeStat(datanodeDetails); + boolean hasEnoughSpace = (nodeMetric != null) && (nodeMetric.get() != null) + && nodeMetric.get().getRemaining().hasResources(sizeRequired); + boolean loadNotTooHeavy = nodeManager.getPipelinesCount(datanodeDetails) <= heavy_node_criteria; + return hasEnoughSpace && loadNotTooHeavy; + } + + /** + * Filter out viable nodes based on + * 1. nodes that are healthy + * 2. nodes that have enough space + * 3. nodes that are not too heavily engaged in other pipelines + * @param excludedNodes - excluded nodes + * @param nodesRequired - number of datanodes required. + * @param sizeRequired - size required for the container or block. + * @return a list of viable nodes + * @throws SCMException when viable nodes are not enough in numbers + */ + List<DatanodeDetails> filterViableNodes(List<DatanodeDetails> excludedNodes, + int nodesRequired, final long sizeRequired) throws SCMException { + // get nodes in HEALTHY state + List<DatanodeDetails> healthyNodes = + nodeManager.getNodes(HddsProtos.NodeState.HEALTHY); + if (excludedNodes != null) { + healthyNodes.removeAll(excludedNodes); + } + String msg; + if (healthyNodes.size() == 0) { + msg = "No healthy node found to allocate container."; + LOG.error(msg); + throw new SCMException(msg, SCMException.ResultCodes + .FAILED_TO_FIND_HEALTHY_NODES); + } + + if (healthyNodes.size() < nodesRequired) { + msg = String.format("Not enough healthy nodes to allocate container. %d " + + " datanodes required. Found %d", + nodesRequired, healthyNodes.size()); + LOG.error(msg); + throw new SCMException(msg, + SCMException.ResultCodes.FAILED_TO_FIND_SUITABLE_NODE); + } + + // filter nodes that meet the size and pipeline engagement criteria + List<DatanodeDetails> healthyList = healthyNodes.stream().filter(d -> + meetCriteria(d, sizeRequired)).collect(Collectors.toList()); + + if (healthyList.size() < nodesRequired) { + msg = String.format("Unable to find enough nodes that meet the space " + + "requirement of %d bytes in healthy node set." + + " Nodes required: %d Found: %d", + sizeRequired, nodesRequired, healthyList.size()); + LOG.error(msg); + throw new SCMException(msg, + SCMException.ResultCodes.FAILED_TO_FIND_NODES_WITH_SPACE); + } + return healthyList; + } + + /** + * Call by pipeline placement in order to choose datanodes to join the pipeline. + * @param excludedNodes - excluded nodes + * @param favoredNodes - list of nodes preferred. + * @param nodesRequired - number of datanodes required. + * @param sizeRequired - size required for the container or block. + * @return a list of chosen datanodeDetails + * @throws SCMException when chosen nodes are not enough in numbers + */ + @Override + public List<DatanodeDetails> chooseDatanodes( + List<DatanodeDetails> excludedNodes, List<DatanodeDetails> favoredNodes, + int nodesRequired, final long sizeRequired) throws SCMException { + // get a list of viable nodes based on criteria + List<DatanodeDetails> healthyNodes = filterViableNodes(excludedNodes, nodesRequired, sizeRequired); + + List<DatanodeDetails> results = new ArrayList<>(); + + // First choose an anchor nodes randomly + DatanodeDetails anchor = chooseNode(healthyNodes); + if (anchor == null) { + LOG.error("Unable to find the first healthy nodes that " + + "meet the criteria. Required nodes: {}, Found nodes: {}", + nodesRequired, results.size()); + throw new SCMException("Unable to find required number of nodes.", + SCMException.ResultCodes.FAILED_TO_FIND_SUITABLE_NODE); + } + + results.add(anchor); + nodesRequired --; + for (int x = 0; x < nodesRequired; x++) { + // invoke the choose function defined in the derived classes. + DatanodeDetails pick = chooseNextNode(healthyNodes, excludedNodes, anchor); + if (pick != null) { + results.add(pick); + } + } + + if (results.size() < nodesRequired) { + LOG.error("Unable to find the required number of healthy nodes that " + + "meet the criteria. Required nodes: {}, Found nodes: {}", + nodesRequired, results.size()); + throw new SCMException("Unable to find required number of nodes.", + SCMException.ResultCodes.FAILED_TO_FIND_SUITABLE_NODE); + } + return results; + + } + + /** + * Find a node from the healthy list and return it after removing it from the + * list that we are operating on. + * @param healthyNodes - Set of healthy nodes we can choose from. + * @return chosen datanodDetails + */ + @Override + public DatanodeDetails chooseNode( + List<DatanodeDetails> healthyNodes) { + int firstNodeNdx = getRand().nextInt(healthyNodes.size()); + int secondNodeNdx = getRand().nextInt(healthyNodes.size()); + + DatanodeDetails datanodeDetails; + // There is a possibility that both numbers will be same. + // if that is so, we just return the node. + if (firstNodeNdx == secondNodeNdx) { + datanodeDetails = healthyNodes.get(firstNodeNdx); + } else { + DatanodeDetails firstNodeDetails = healthyNodes.get(firstNodeNdx); + DatanodeDetails secondNodeDetails = healthyNodes.get(secondNodeNdx); + SCMNodeMetric firstNodeMetric = + nodeManager.getNodeStat(firstNodeDetails); + SCMNodeMetric secondNodeMetric = + nodeManager.getNodeStat(secondNodeDetails); + datanodeDetails = firstNodeMetric.isGreater(secondNodeMetric.get()) + ? firstNodeDetails : secondNodeDetails; + } + healthyNodes.remove(datanodeDetails); Review comment: We remove the first selected node by removing it from healthyNodes, the second and third by add them to excludeNode. Can we add unit test to verify the expected behavior? ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking ------------------- Worklog Id: (was: 303343) Time Spent: 40m (was: 0.5h) > Add default pipeline placement policy implementation > ---------------------------------------------------- > > Key: HDDS-1577 > URL: https://issues.apache.org/jira/browse/HDDS-1577 > Project: Hadoop Distributed Data Store > Issue Type: Sub-task > Components: SCM > Reporter: Siddharth Wagle > Assignee: Li Cheng > Priority: Major > Labels: pull-request-available > Time Spent: 40m > Remaining Estimate: 0h > > This is a simpler implementation of the PipelinePlacementPolicy that can be > utilized if no network topology is defined for the cluster. We try to form > pipelines from existing HEALTHY datanodes randomly, as long as they satisfy > PipelinePlacementCriteria. -- This message was sent by Atlassian Jira (v8.3.2#803003) --------------------------------------------------------------------- To unsubscribe, e-mail: hdfs-issues-unsubscr...@hadoop.apache.org For additional commands, e-mail: hdfs-issues-h...@hadoop.apache.org