This is an automated email from the ASF dual-hosted git repository. sodonnell pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/hadoop-ozone.git
The following commit(s) were added to refs/heads/master by this push: new 7d132ce HDDS-3179. Pipeline placement based on Topology does not have fallback (#678) 7d132ce is described below commit 7d132ce38d5d8aeb3b72e770f99881888c2753ee Author: Li Cheng <bloodhell2...@gmail.com> AuthorDate: Fri Mar 27 15:29:59 2020 +0800 HDDS-3179. Pipeline placement based on Topology does not have fallback (#678) --- .../hadoop/hdds/protocol/DatanodeDetails.java | 2 +- .../hdds/scm/pipeline/PipelinePlacementPolicy.java | 37 ++++-- .../hadoop/hdds/scm/container/MockNodeManager.java | 10 +- .../scm/pipeline/TestPipelinePlacementPolicy.java | 145 ++++++++++++++++----- 4 files changed, 145 insertions(+), 49 deletions(-) diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/protocol/DatanodeDetails.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/protocol/DatanodeDetails.java index a235a4b..28ed36d 100644 --- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/protocol/DatanodeDetails.java +++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/protocol/DatanodeDetails.java @@ -70,7 +70,7 @@ public class DatanodeDetails extends NodeImpl implements this.certSerialId = certSerialId; } - protected DatanodeDetails(DatanodeDetails datanodeDetails) { + public DatanodeDetails(DatanodeDetails datanodeDetails) { super(datanodeDetails.getHostName(), datanodeDetails.getNetworkLocation(), datanodeDetails.getCost()); this.uuid = datanodeDetails.uuid; diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelinePlacementPolicy.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelinePlacementPolicy.java index 0f30449..e96b120 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelinePlacementPolicy.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelinePlacementPolicy.java @@ -99,9 +99,8 @@ public final class PipelinePlacementPolicy extends SCMCommonPlacementPolicy { try { pipeline = stateManager.getPipeline(pid); } catch (PipelineNotFoundException e) { - LOG.error("Pipeline not found in pipeline state manager during" + - " pipeline creation. PipelineID: " + pid + - " exception: " + e.getMessage()); + LOG.debug("Pipeline not found in pipeline state manager during" + + " pipeline creation. PipelineID: {}", pid, e); continue; } if (pipeline != null && @@ -282,26 +281,32 @@ public final class PipelinePlacementPolicy extends SCMCommonPlacementPolicy { LOG.debug("Second node chosen: {}", nextNode); } } else { - if (LOG.isDebugEnabled()) { - LOG.debug("Pipeline Placement: Unable to find 2nd node on different " + - "rack based on rack awareness."); - } + LOG.debug("Pipeline Placement: Unable to find 2nd node on different " + + "rack based on rack awareness. anchor: {}", anchor); } // Then choose nodes close to anchor based on network topology int nodesToFind = nodesRequired - results.size(); for (int x = 0; x < nodesToFind; x++) { // Pick remaining nodes based on the existence of rack awareness. - DatanodeDetails pick = rackAwareness - ? chooseNodeFromNetworkTopology( - nodeManager.getClusterNetworkTopologyMap(), anchor, exclude) - : fallBackPickNodes(healthyNodes, exclude); + DatanodeDetails pick = null; + if (rackAwareness) { + pick = chooseNodeFromNetworkTopology( + nodeManager.getClusterNetworkTopologyMap(), anchor, exclude); + } + // fall back protection + if (pick == null) { + pick = fallBackPickNodes(healthyNodes, exclude); + if (rackAwareness) { + LOG.debug("Failed to choose node based on topology. Fallback " + + "picks node as: {}", pick); + } + } + if (pick != null) { results.add(pick); exclude.add(pick); - if (LOG.isDebugEnabled()) { - LOG.debug("Remaining node chosen: {}", pick); - } + LOG.debug("Remaining node chosen: {}", pick); } } @@ -414,6 +419,10 @@ public final class PipelinePlacementPolicy extends SCMCommonPlacementPolicy { Node pick = networkTopology.chooseRandom( anchor.getNetworkLocation(), excluded); DatanodeDetails pickedNode = (DatanodeDetails) pick; + if (pickedNode == null) { + LOG.debug("Pick node is null, excluded nodes {}, anchor {}.", + excluded, anchor); + } return pickedNode; } } diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/MockNodeManager.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/MockNodeManager.java index cbeef7f..f15bfdd 100644 --- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/MockNodeManager.java +++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/MockNodeManager.java @@ -94,6 +94,7 @@ public class MockNodeManager implements NodeManager { private ConcurrentMap<String, Set<String>> dnsToUuidMap; public MockNodeManager(NetworkTopologyImpl clusterMap, + List<DatanodeDetails> nodes, boolean initializeFakeNodes, int nodeCount) { this.healthyNodes = new LinkedList<>(); this.staleNodes = new LinkedList<>(); @@ -104,6 +105,13 @@ public class MockNodeManager implements NodeManager { this.dnsToUuidMap = new ConcurrentHashMap<>(); this.aggregateStat = new SCMNodeStat(); this.clusterMap = clusterMap; + if (!nodes.isEmpty()) { + for (int x = 0; x < nodes.size(); x++) { + DatanodeDetails node = nodes.get(x); + register(node, null, null); + populateNodeMetric(node, x); + } + } if (initializeFakeNodes) { for (int x = 0; x < nodeCount; x++) { DatanodeDetails dd = MockDatanodeDetails.randomDatanodeDetails(); @@ -116,7 +124,7 @@ public class MockNodeManager implements NodeManager { } public MockNodeManager(boolean initializeFakeNodes, int nodeCount) { - this(new NetworkTopologyImpl(new OzoneConfiguration()), + this(new NetworkTopologyImpl(new OzoneConfiguration()), new ArrayList<>(), initializeFakeNodes, nodeCount); } diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestPipelinePlacementPolicy.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestPipelinePlacementPolicy.java index daad808..fafc4b0 100644 --- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestPipelinePlacementPolicy.java +++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestPipelinePlacementPolicy.java @@ -21,6 +21,7 @@ package org.apache.hadoop.hdds.scm.pipeline; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hdds.conf.OzoneConfiguration; import org.apache.hadoop.hdds.protocol.DatanodeDetails; +import org.apache.hadoop.hdds.protocol.MockDatanodeDetails; import org.apache.hadoop.hdds.protocol.proto.HddsProtos; import org.apache.hadoop.hdds.scm.ScmConfigKeys; import org.apache.hadoop.hdds.scm.container.MockNodeManager; @@ -35,6 +36,9 @@ import java.util.*; import java.util.stream.Collectors; import static org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_DATANODE_PIPELINE_LIMIT; +import static org.apache.hadoop.hdds.scm.net.NetConstants.LEAF_SCHEMA; +import static org.apache.hadoop.hdds.scm.net.NetConstants.RACK_SCHEMA; +import static org.apache.hadoop.hdds.scm.net.NetConstants.ROOT_SCHEMA; /** * Test for PipelinePlacementPolicy. @@ -43,25 +47,55 @@ public class TestPipelinePlacementPolicy { private MockNodeManager nodeManager; private OzoneConfiguration conf; private PipelinePlacementPolicy placementPolicy; + private NetworkTopologyImpl cluster; private static final int PIPELINE_PLACEMENT_MAX_NODES_COUNT = 10; + private List<DatanodeDetails> nodesWithOutRackAwareness = new ArrayList<>(); + private List<DatanodeDetails> nodesWithRackAwareness = new ArrayList<>(); + @Before public void init() throws Exception { - nodeManager = new MockNodeManager(true, - PIPELINE_PLACEMENT_MAX_NODES_COUNT); + cluster = initTopology(); + // start with nodes with rack awareness. + nodeManager = new MockNodeManager(cluster, getNodesWithRackAwareness(), + false, PIPELINE_PLACEMENT_MAX_NODES_COUNT); conf = new OzoneConfiguration(); conf.setInt(OZONE_DATANODE_PIPELINE_LIMIT, 5); placementPolicy = new PipelinePlacementPolicy( nodeManager, new PipelineStateManager(), conf); } + private NetworkTopologyImpl initTopology() { + NodeSchema[] schemas = new NodeSchema[] + {ROOT_SCHEMA, RACK_SCHEMA, LEAF_SCHEMA}; + NodeSchemaManager.getInstance().init(schemas, true); + NetworkTopologyImpl topology = + new NetworkTopologyImpl(NodeSchemaManager.getInstance()); + return topology; + } + + private List<DatanodeDetails> getNodesWithRackAwareness() { + List<DatanodeDetails> datanodes = new ArrayList<>(); + for (Node node : NODES) { + DatanodeDetails datanode = overwriteLocationInNode( + getNodesWithoutRackAwareness(), node); + nodesWithRackAwareness.add(datanode); + datanodes.add(datanode); + } + return datanodes; + } + + private DatanodeDetails getNodesWithoutRackAwareness() { + DatanodeDetails node = MockDatanodeDetails.randomDatanodeDetails(); + nodesWithOutRackAwareness.add(node); + return node; + } + @Test - public void testChooseNodeBasedOnNetworkTopology() { - List<DatanodeDetails> healthyNodes = - nodeManager.getNodes(HddsProtos.NodeState.HEALTHY); - DatanodeDetails anchor = placementPolicy.chooseNode(healthyNodes); + public void testChooseNodeBasedOnNetworkTopology() throws SCMException { + DatanodeDetails anchor = placementPolicy.chooseNode(nodesWithRackAwareness); // anchor should be removed from healthyNodes after being chosen. - Assert.assertFalse(healthyNodes.contains(anchor)); + Assert.assertFalse(nodesWithRackAwareness.contains(anchor)); List<DatanodeDetails> excludedNodes = new ArrayList<>(PIPELINE_PLACEMENT_MAX_NODES_COUNT); @@ -69,11 +103,43 @@ public class TestPipelinePlacementPolicy { DatanodeDetails nextNode = placementPolicy.chooseNodeFromNetworkTopology( nodeManager.getClusterNetworkTopologyMap(), anchor, excludedNodes); Assert.assertFalse(excludedNodes.contains(nextNode)); - // nextNode should not be the same as anchor. + // next node should not be the same as anchor. Assert.assertTrue(anchor.getUuid() != nextNode.getUuid()); + // next node should be on the same rack based on topology. + Assert.assertEquals(anchor.getNetworkLocation(), + nextNode.getNetworkLocation()); } @Test + public void testChooseNodeWithSingleNodeRack() throws SCMException { + // There is only one node on 3 racks altogether. + List<DatanodeDetails> datanodes = new ArrayList<>(); + for (Node node : SINGLE_NODE_RACK) { + DatanodeDetails datanode = overwriteLocationInNode( + MockDatanodeDetails.randomDatanodeDetails(), node); + datanodes.add(datanode); + } + MockNodeManager localNodeManager = new MockNodeManager(initTopology(), + datanodes, false, datanodes.size()); + PipelinePlacementPolicy localPlacementPolicy = new PipelinePlacementPolicy( + localNodeManager, new PipelineStateManager(), conf); + int nodesRequired = HddsProtos.ReplicationFactor.THREE.getNumber(); + List<DatanodeDetails> results = localPlacementPolicy.chooseDatanodes( + new ArrayList<>(datanodes.size()), + new ArrayList<>(datanodes.size()), + nodesRequired, 0); + + Assert.assertEquals(nodesRequired, results.size()); + // 3 nodes should be on different racks. + Assert.assertNotEquals(results.get(0).getNetworkLocation(), + results.get(1).getNetworkLocation()); + Assert.assertNotEquals(results.get(0).getNetworkLocation(), + results.get(2).getNetworkLocation()); + Assert.assertNotEquals(results.get(1).getNetworkLocation(), + results.get(2).getNetworkLocation()); + } + + @Test public void testChooseNodeBasedOnRackAwareness() { List<DatanodeDetails> healthyNodes = overWriteLocationInNodes( nodeManager.getNodes(HddsProtos.NodeState.HEALTHY)); @@ -84,8 +150,9 @@ public class TestPipelinePlacementPolicy { healthyNodes, new ArrayList<>(PIPELINE_PLACEMENT_MAX_NODES_COUNT), topologyWithDifRacks, anchor); Assert.assertNotNull(nextNode); - Assert.assertFalse(anchor.getNetworkLocation().equals( - nextNode.getNetworkLocation())); + // next node should be on a different rack. + Assert.assertNotEquals(anchor.getNetworkLocation(), + nextNode.getNetworkLocation()); } @Test @@ -115,25 +182,25 @@ public class TestPipelinePlacementPolicy { @Test public void testRackAwarenessNotEnabledWithFallBack() throws SCMException{ - List<DatanodeDetails> healthyNodes = - nodeManager.getNodes(HddsProtos.NodeState.HEALTHY); - DatanodeDetails anchor = placementPolicy.chooseNode(healthyNodes); - DatanodeDetails randomNode = placementPolicy.chooseNode(healthyNodes); + DatanodeDetails anchor = placementPolicy + .chooseNode(nodesWithOutRackAwareness); + DatanodeDetails randomNode = placementPolicy + .chooseNode(nodesWithOutRackAwareness); // rack awareness is not enabled. Assert.assertTrue(anchor.getNetworkLocation().equals( randomNode.getNetworkLocation())); NetworkTopology topology = new NetworkTopologyImpl(new Configuration()); DatanodeDetails nextNode = placementPolicy.chooseNodeBasedOnRackAwareness( - healthyNodes, new ArrayList<>(PIPELINE_PLACEMENT_MAX_NODES_COUNT), - topology, anchor); + nodesWithOutRackAwareness, new ArrayList<>( + PIPELINE_PLACEMENT_MAX_NODES_COUNT), topology, anchor); // RackAwareness should not be able to choose any node. Assert.assertNull(nextNode); // PlacementPolicy should still be able to pick a set of 3 nodes. int numOfNodes = HddsProtos.ReplicationFactor.THREE.getNumber(); List<DatanodeDetails> results = placementPolicy - .getResultSet(numOfNodes, healthyNodes); + .getResultSet(numOfNodes, nodesWithOutRackAwareness); Assert.assertEquals(numOfNodes, results.size()); // All nodes are on same rack. @@ -146,14 +213,20 @@ public class TestPipelinePlacementPolicy { private final static Node[] NODES = new NodeImpl[] { new NodeImpl("h1", "/r1", NetConstants.NODE_COST_DEFAULT), new NodeImpl("h2", "/r1", NetConstants.NODE_COST_DEFAULT), - new NodeImpl("h3", "/r1", NetConstants.NODE_COST_DEFAULT), - new NodeImpl("h4", "/r1", NetConstants.NODE_COST_DEFAULT), - new NodeImpl("h5", "/r2", NetConstants.NODE_COST_DEFAULT), - new NodeImpl("h6", "/r2", NetConstants.NODE_COST_DEFAULT), - new NodeImpl("h7", "/r2", NetConstants.NODE_COST_DEFAULT), - new NodeImpl("h8", "/r2", NetConstants.NODE_COST_DEFAULT), + new NodeImpl("h3", "/r2", NetConstants.NODE_COST_DEFAULT), + new NodeImpl("h4", "/r2", NetConstants.NODE_COST_DEFAULT), + new NodeImpl("h5", "/r3", NetConstants.NODE_COST_DEFAULT), + new NodeImpl("h6", "/r3", NetConstants.NODE_COST_DEFAULT), + new NodeImpl("h7", "/r4", NetConstants.NODE_COST_DEFAULT), + new NodeImpl("h8", "/r4", NetConstants.NODE_COST_DEFAULT), }; + // 3 racks with single node. + private final static Node[] SINGLE_NODE_RACK = new NodeImpl[] { + new NodeImpl("h1", "/r1", NetConstants.NODE_COST_DEFAULT), + new NodeImpl("h2", "/r2", NetConstants.NODE_COST_DEFAULT), + new NodeImpl("h3", "/r3", NetConstants.NODE_COST_DEFAULT) + }; private NetworkTopology createNetworkTopologyOnDifRacks() { NetworkTopology topology = new NetworkTopologyImpl(new Configuration()); @@ -163,20 +236,26 @@ public class TestPipelinePlacementPolicy { return topology; } + private DatanodeDetails overwriteLocationInNode( + DatanodeDetails datanode, Node node) { + DatanodeDetails result = DatanodeDetails.newBuilder() + .setUuid(datanode.getUuidString()) + .setHostName(datanode.getHostName()) + .setIpAddress(datanode.getIpAddress()) + .addPort(datanode.getPort(DatanodeDetails.Port.Name.STANDALONE)) + .addPort(datanode.getPort(DatanodeDetails.Port.Name.RATIS)) + .addPort(datanode.getPort(DatanodeDetails.Port.Name.REST)) + .setNetworkLocation(node.getNetworkLocation()).build(); + return result; + } + private List<DatanodeDetails> overWriteLocationInNodes( List<DatanodeDetails> datanodes) { List<DatanodeDetails> results = new ArrayList<>(datanodes.size()); for (int i = 0; i < datanodes.size(); i++) { - DatanodeDetails datanode = datanodes.get(i); - DatanodeDetails result = DatanodeDetails.newBuilder() - .setUuid(datanode.getUuidString()) - .setHostName(datanode.getHostName()) - .setIpAddress(datanode.getIpAddress()) - .addPort(datanode.getPort(DatanodeDetails.Port.Name.STANDALONE)) - .addPort(datanode.getPort(DatanodeDetails.Port.Name.RATIS)) - .addPort(datanode.getPort(DatanodeDetails.Port.Name.REST)) - .setNetworkLocation(NODES[i].getNetworkLocation()).build(); - results.add(result); + DatanodeDetails datanode = overwriteLocationInNode( + datanodes.get(i), NODES[i]); + results.add(datanode); } return results; } --------------------------------------------------------------------- To unsubscribe, e-mail: ozone-commits-unsubscr...@hadoop.apache.org For additional commands, e-mail: ozone-commits-h...@hadoop.apache.org