HDFS-12725. BlockPlacementPolicyRackFaultTolerant fails with very uneven racks.
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/b00f828d Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/b00f828d Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/b00f828d Branch: refs/heads/HDFS-9806 Commit: b00f828d84e4e029fd4786ebe827ce704a1b2a04 Parents: 6fc09be Author: Xiao Chen <x...@apache.org> Authored: Thu Nov 2 21:51:14 2017 -0700 Committer: Xiao Chen <x...@apache.org> Committed: Thu Nov 2 21:53:13 2017 -0700 ---------------------------------------------------------------------- .../BlockPlacementPolicyRackFaultTolerant.java | 70 ++++++++-- .../hdfs/TestErasureCodingMultipleRacks.java | 131 ++++++++++++++++--- 2 files changed, 173 insertions(+), 28 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hadoop/blob/b00f828d/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicyRackFaultTolerant.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicyRackFaultTolerant.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicyRackFaultTolerant.java index 1eac3ea..95c5c88 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicyRackFaultTolerant.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicyRackFaultTolerant.java @@ -62,10 +62,17 @@ public class BlockPlacementPolicyRackFaultTolerant extends BlockPlacementPolicyD * randomly. * 2. If total replica expected is bigger than numOfRacks, it choose: * 2a. Fill each rack exactly (maxNodesPerRack-1) replicas. - * 2b. For some random racks, place one more replica to each one of them, until - * numOfReplicas have been chosen. <br> - * In the end, the difference of the numbers of replicas for each two racks - * is no more than 1. + * 2b. For some random racks, place one more replica to each one of them, + * until numOfReplicas have been chosen. <br> + * 3. If after step 2, there are still replicas not placed (due to some + * racks have fewer datanodes than maxNodesPerRack), the rest of the replicas + * is placed evenly on the rest of the racks who have Datanodes that have + * not been placed a replica. + * 4. If after step 3, there are still replicas not placed. A + * {@link NotEnoughReplicasException} is thrown. + * <p> + * For normal setups, step 2 would suffice. So in the end, the difference + * of the numbers of replicas for each two racks is no more than 1. * Either way it always prefer local storage. * @return local node of writer */ @@ -132,24 +139,63 @@ public class BlockPlacementPolicyRackFaultTolerant extends BlockPlacementPolicyD chooseOnce(numOfReplicas, writer, excludedNodes, blocksize, maxNodesPerRack, results, avoidStaleNodes, storageTypes); } catch (NotEnoughReplicasException e) { - LOG.debug("Only able to place {} of {} (maxNodesPerRack={}) nodes " + - "evenly across racks, falling back to uneven placement.", - results.size(), numOfReplicas, maxNodesPerRack); + LOG.warn("Only able to place {} of total expected {}" + + " (maxNodesPerRack={}, numOfReplicas={}) nodes " + + "evenly across racks, falling back to evenly place on the " + + "remaining racks. This may not guarantee rack-level fault " + + "tolerance. Please check if the racks are configured properly.", + results.size(), totalReplicaExpected, maxNodesPerRack, numOfReplicas); LOG.debug("Caught exception was:", e); + chooseEvenlyFromRemainingRacks(writer, excludedNodes, blocksize, + maxNodesPerRack, results, avoidStaleNodes, storageTypes, + totalReplicaExpected, e); + + } + + return writer; + } + + /** + * Choose as evenly as possible from the racks which have available datanodes. + */ + private void chooseEvenlyFromRemainingRacks(Node writer, + Set<Node> excludedNodes, long blocksize, int maxNodesPerRack, + List<DatanodeStorageInfo> results, boolean avoidStaleNodes, + EnumMap<StorageType, Integer> storageTypes, int totalReplicaExpected, + NotEnoughReplicasException e) throws NotEnoughReplicasException { + int numResultsOflastChoose = 0; + NotEnoughReplicasException lastException = e; + int bestEffortMaxNodesPerRack = maxNodesPerRack; + while (results.size() != totalReplicaExpected && + numResultsOflastChoose != results.size()) { // Exclude the chosen nodes + final Set<Node> newExcludeNodes = new HashSet<>(); for (DatanodeStorageInfo resultStorage : results) { addToExcludedNodes(resultStorage.getDatanodeDescriptor(), - excludedNodes); + newExcludeNodes); } LOG.trace("Chosen nodes: {}", results); LOG.trace("Excluded nodes: {}", excludedNodes); - numOfReplicas = totalReplicaExpected - results.size(); - chooseOnce(numOfReplicas, writer, excludedNodes, blocksize, - totalReplicaExpected, results, avoidStaleNodes, storageTypes); + LOG.trace("New Excluded nodes: {}", newExcludeNodes); + final int numOfReplicas = totalReplicaExpected - results.size(); + numResultsOflastChoose = results.size(); + try { + chooseOnce(numOfReplicas, writer, newExcludeNodes, blocksize, + ++bestEffortMaxNodesPerRack, results, avoidStaleNodes, + storageTypes); + } catch (NotEnoughReplicasException nere) { + lastException = nere; + } finally { + excludedNodes.addAll(newExcludeNodes); + } } - return writer; + if (numResultsOflastChoose != totalReplicaExpected) { + LOG.debug("Best effort placement failed: expecting {} replicas, only " + + "chose {}.", totalReplicaExpected, numResultsOflastChoose); + throw lastException; + } } /** http://git-wip-us.apache.org/repos/asf/hadoop/blob/b00f828d/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestErasureCodingMultipleRacks.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestErasureCodingMultipleRacks.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestErasureCodingMultipleRacks.java index eb6213a..0689665d 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestErasureCodingMultipleRacks.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestErasureCodingMultipleRacks.java @@ -18,6 +18,7 @@ package org.apache.hadoop.hdfs; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.BlockLocation; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hdfs.protocol.ErasureCodingPolicy; import org.apache.hadoop.hdfs.server.blockmanagement.BlockPlacementPolicy; @@ -34,6 +35,13 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.slf4j.event.Level; +import java.util.HashMap; +import java.util.Map; + +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_REDUNDANCY_CONSIDERLOAD_KEY; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + /** * Test erasure coding block placement with skewed # nodes per rack. */ @@ -42,10 +50,10 @@ public class TestErasureCodingMultipleRacks { LoggerFactory.getLogger(TestErasureCodingMultipleRacks.class); static { - GenericTestUtils.setLogLevel(BlockPlacementPolicy.LOG, Level.DEBUG); - GenericTestUtils.setLogLevel(BlockPlacementPolicyDefault.LOG, Level.DEBUG); + GenericTestUtils.setLogLevel(BlockPlacementPolicy.LOG, Level.TRACE); + GenericTestUtils.setLogLevel(BlockPlacementPolicyDefault.LOG, Level.TRACE); GenericTestUtils.setLogLevel(BlockPlacementPolicyRackFaultTolerant.LOG, - Level.DEBUG); + Level.TRACE); GenericTestUtils.setLogLevel(NetworkTopology.LOG, Level.DEBUG); } @@ -62,20 +70,38 @@ public class TestErasureCodingMultipleRacks { private DistributedFileSystem dfs; @Before - public void setup() throws Exception { + public void setup() { ecPolicy = getPolicy(); - final int dataUnits = ecPolicy.getNumDataUnits(); - final int parityUnits = ecPolicy.getNumParityUnits(); - final int numDatanodes = dataUnits + parityUnits; - final int numRacks = 2; + conf = new HdfsConfiguration(); + // disable load consideration to test placement only. + conf.setBoolean(DFS_NAMENODE_REDUNDANCY_CONSIDERLOAD_KEY, false); + } + + /** + * Setup cluster with desired number of DN, racks, and specified number of + * rack that only has 1 DN. Other racks will be evenly setup with the number + * of DNs. + * <p> + * This is not done as a {@link Before}, so test cases can setup differently. + * + * @param numDatanodes number of total Datanodes. + * @param numRacks number of total racks + * @param numSingleDnRacks number of racks that only has 1 DN + * @throws Exception + */ + public void setupCluster(final int numDatanodes, final int numRacks, + final int numSingleDnRacks) throws Exception { + assert numDatanodes > numRacks; + assert numRacks > numSingleDnRacks; + assert numSingleDnRacks >= 0; final String[] racks = new String[numDatanodes]; - for (int i = 0; i < numRacks; i++) { + for (int i = 0; i < numSingleDnRacks; i++) { racks[i] = "/rack" + i; } - for (int i = numRacks; i < numDatanodes; i++) { - racks[i] = "/rack" + (numRacks - 1); + for (int i = numSingleDnRacks; i < numDatanodes; i++) { + racks[i] = + "/rack" + (numSingleDnRacks + (i % (numRacks - numSingleDnRacks))); } - conf = new HdfsConfiguration(); cluster = new MiniDFSCluster.Builder(conf) .numDataNodes(numDatanodes) .racks(racks) @@ -92,16 +118,89 @@ public class TestErasureCodingMultipleRacks { } } + // Extreme case. + @Test + public void testSkewedRack1() throws Exception { + final int dataUnits = ecPolicy.getNumDataUnits(); + final int parityUnits = ecPolicy.getNumParityUnits(); + setupCluster(dataUnits + parityUnits, 2, 1); + + final int filesize = ecPolicy.getNumDataUnits() * ecPolicy.getCellSize(); + byte[] contents = new byte[filesize]; + + final Path path = new Path("/testfile"); + LOG.info("Writing file " + path); + DFSTestUtil.writeFile(dfs, path, contents); + BlockLocation[] blocks = dfs.getFileBlockLocations(path, 0, Long.MAX_VALUE); + assertEquals(ecPolicy.getNumDataUnits() + ecPolicy.getNumParityUnits(), + blocks[0].getHosts().length); + } + + // 1 rack has many nodes, other racks have single node. Extreme case. + @Test + public void testSkewedRack2() throws Exception { + final int dataUnits = ecPolicy.getNumDataUnits(); + final int parityUnits = ecPolicy.getNumParityUnits(); + setupCluster(dataUnits + parityUnits * 2, dataUnits, dataUnits - 1); + + final int filesize = ecPolicy.getNumDataUnits() * ecPolicy.getCellSize(); + byte[] contents = new byte[filesize]; + + final Path path = new Path("/testfile"); + LOG.info("Writing file " + path); + DFSTestUtil.writeFile(dfs, path, contents); + BlockLocation[] blocks = dfs.getFileBlockLocations(path, 0, Long.MAX_VALUE); + assertEquals(ecPolicy.getNumDataUnits() + ecPolicy.getNumParityUnits(), + blocks[0].getHosts().length); + } + + // 2 racks have sufficient nodes, other racks has 1. Should be able to + // tolerate 1 rack failure. @Test - public void testSkewedRack() throws Exception { - final int filesize = ecPolicy.getNumDataUnits() * ecPolicy - .getCellSize(); + public void testSkewedRack3() throws Exception { + final int dataUnits = ecPolicy.getNumDataUnits(); + final int parityUnits = ecPolicy.getNumParityUnits(); + // Create enough extra DNs on the 2 racks to test even placement. + // Desired placement is parityUnits replicas on the 2 racks, and 1 replica + // on the rest of the racks (which only have 1 DN) + setupCluster(dataUnits + parityUnits * 4, dataUnits - parityUnits + 2, + dataUnits - parityUnits); + + final int filesize = ecPolicy.getNumDataUnits() * ecPolicy.getCellSize(); byte[] contents = new byte[filesize]; - for (int i = 0; i < 10; i++) { + for (int i = 0; i < 10; ++i) { final Path path = new Path("/testfile" + i); LOG.info("Writing file " + path); DFSTestUtil.writeFile(dfs, path, contents); + BlockLocation[] blocks = + dfs.getFileBlockLocations(path, 0, Long.MAX_VALUE); + assertEquals(ecPolicy.getNumDataUnits() + ecPolicy.getNumParityUnits(), + blocks[0].getHosts().length); + assertRackFailureTolerated(blocks[0].getTopologyPaths()); + } + } + + // Verifies that no more than numParityUnits is placed on a rack. + private void assertRackFailureTolerated(final String[] topologies) { + final Map<String, Integer> racksCount = new HashMap<>(); + for (String t : topologies) { + final Integer count = racksCount.get(getRackName(t)); + if (count == null) { + racksCount.put(getRackName(t), 1); + } else { + racksCount.put(getRackName(t), count + 1); + } } + LOG.info("Rack count map is: {}", racksCount); + + for (Integer count : racksCount.values()) { + assertTrue(count <= ecPolicy.getNumParityUnits()); + } + } + + private String getRackName(final String topology) { + assert topology.indexOf('/', 1) > 0; + return topology.substring(0, topology.indexOf('/', 1)); } } --------------------------------------------------------------------- To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org For additional commands, e-mail: common-commits-h...@hadoop.apache.org