Repository: hadoop
Updated Branches:
  refs/heads/branch-3.0 3e70fd9af -> 621a50ebb


HDFS-12725. BlockPlacementPolicyRackFaultTolerant fails with very uneven racks.

(cherry picked from commit e1187bad8e9b9abb54a55b5f9ab8373b0f64e6d7)


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/621a50eb
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/621a50eb
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/621a50eb

Branch: refs/heads/branch-3.0
Commit: 621a50ebb141cd0b51931fc6f665ff0742505be9
Parents: 3e70fd9
Author: Xiao Chen <x...@apache.org>
Authored: Thu Nov 2 21:51:14 2017 -0700
Committer: Xiao Chen <x...@apache.org>
Committed: Thu Nov 2 21:53:20 2017 -0700

----------------------------------------------------------------------
 .../BlockPlacementPolicyRackFaultTolerant.java  |  70 ++++++++--
 .../hdfs/TestErasureCodingMultipleRacks.java    | 131 ++++++++++++++++---
 2 files changed, 173 insertions(+), 28 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/621a50eb/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicyRackFaultTolerant.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicyRackFaultTolerant.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicyRackFaultTolerant.java
index 1eac3ea..95c5c88 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicyRackFaultTolerant.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicyRackFaultTolerant.java
@@ -62,10 +62,17 @@ public class BlockPlacementPolicyRackFaultTolerant extends 
BlockPlacementPolicyD
    * randomly.
    * 2. If total replica expected is bigger than numOfRacks, it choose:
    *  2a. Fill each rack exactly (maxNodesPerRack-1) replicas.
-   *  2b. For some random racks, place one more replica to each one of them, 
until
-   *  numOfReplicas have been chosen. <br>
-   * In the end, the difference of the numbers of replicas for each two racks
-   * is no more than 1.
+   *  2b. For some random racks, place one more replica to each one of them,
+   *  until numOfReplicas have been chosen. <br>
+   * 3. If after step 2, there are still replicas not placed (due to some
+   * racks have fewer datanodes than maxNodesPerRack), the rest of the replicas
+   * is placed evenly on the rest of the racks who have Datanodes that have
+   * not been placed a replica.
+   * 4. If after step 3, there are still replicas not placed. A
+   * {@link NotEnoughReplicasException} is thrown.
+   * <p>
+   * For normal setups, step 2 would suffice. So in the end, the difference
+   * of the numbers of replicas for each two racks is no more than 1.
    * Either way it always prefer local storage.
    * @return local node of writer
    */
@@ -132,24 +139,63 @@ public class BlockPlacementPolicyRackFaultTolerant 
extends BlockPlacementPolicyD
       chooseOnce(numOfReplicas, writer, excludedNodes, blocksize,
           maxNodesPerRack, results, avoidStaleNodes, storageTypes);
     } catch (NotEnoughReplicasException e) {
-      LOG.debug("Only able to place {} of {} (maxNodesPerRack={}) nodes " +
-              "evenly across racks, falling back to uneven placement.",
-          results.size(), numOfReplicas, maxNodesPerRack);
+      LOG.warn("Only able to place {} of total expected {}"
+              + " (maxNodesPerRack={}, numOfReplicas={}) nodes "
+              + "evenly across racks, falling back to evenly place on the "
+              + "remaining racks. This may not guarantee rack-level fault "
+              + "tolerance. Please check if the racks are configured 
properly.",
+          results.size(), totalReplicaExpected, maxNodesPerRack, 
numOfReplicas);
       LOG.debug("Caught exception was:", e);
+      chooseEvenlyFromRemainingRacks(writer, excludedNodes, blocksize,
+          maxNodesPerRack, results, avoidStaleNodes, storageTypes,
+          totalReplicaExpected, e);
+
+    }
+
+    return writer;
+  }
+
+  /**
+   * Choose as evenly as possible from the racks which have available 
datanodes.
+   */
+  private void chooseEvenlyFromRemainingRacks(Node writer,
+      Set<Node> excludedNodes, long blocksize, int maxNodesPerRack,
+      List<DatanodeStorageInfo> results, boolean avoidStaleNodes,
+      EnumMap<StorageType, Integer> storageTypes, int totalReplicaExpected,
+      NotEnoughReplicasException e) throws NotEnoughReplicasException {
+    int numResultsOflastChoose = 0;
+    NotEnoughReplicasException lastException = e;
+    int bestEffortMaxNodesPerRack = maxNodesPerRack;
+    while (results.size() != totalReplicaExpected &&
+        numResultsOflastChoose != results.size()) {
       // Exclude the chosen nodes
+      final Set<Node> newExcludeNodes = new HashSet<>();
       for (DatanodeStorageInfo resultStorage : results) {
         addToExcludedNodes(resultStorage.getDatanodeDescriptor(),
-            excludedNodes);
+            newExcludeNodes);
       }
 
       LOG.trace("Chosen nodes: {}", results);
       LOG.trace("Excluded nodes: {}", excludedNodes);
-      numOfReplicas = totalReplicaExpected - results.size();
-      chooseOnce(numOfReplicas, writer, excludedNodes, blocksize,
-          totalReplicaExpected, results, avoidStaleNodes, storageTypes);
+      LOG.trace("New Excluded nodes: {}", newExcludeNodes);
+      final int numOfReplicas = totalReplicaExpected - results.size();
+      numResultsOflastChoose = results.size();
+      try {
+        chooseOnce(numOfReplicas, writer, newExcludeNodes, blocksize,
+            ++bestEffortMaxNodesPerRack, results, avoidStaleNodes,
+            storageTypes);
+      } catch (NotEnoughReplicasException nere) {
+        lastException = nere;
+      } finally {
+        excludedNodes.addAll(newExcludeNodes);
+      }
     }
 
-    return writer;
+    if (numResultsOflastChoose != totalReplicaExpected) {
+      LOG.debug("Best effort placement failed: expecting {} replicas, only "
+          + "chose {}.", totalReplicaExpected, numResultsOflastChoose);
+      throw lastException;
+    }
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/hadoop/blob/621a50eb/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestErasureCodingMultipleRacks.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestErasureCodingMultipleRacks.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestErasureCodingMultipleRacks.java
index eb6213a..0689665d 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestErasureCodingMultipleRacks.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestErasureCodingMultipleRacks.java
@@ -18,6 +18,7 @@
 package org.apache.hadoop.hdfs;
 
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.BlockLocation;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hdfs.protocol.ErasureCodingPolicy;
 import org.apache.hadoop.hdfs.server.blockmanagement.BlockPlacementPolicy;
@@ -34,6 +35,13 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.slf4j.event.Level;
 
+import java.util.HashMap;
+import java.util.Map;
+
+import static 
org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_REDUNDANCY_CONSIDERLOAD_KEY;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
 /**
  * Test erasure coding block placement with skewed # nodes per rack.
  */
@@ -42,10 +50,10 @@ public class TestErasureCodingMultipleRacks {
       LoggerFactory.getLogger(TestErasureCodingMultipleRacks.class);
 
   static {
-    GenericTestUtils.setLogLevel(BlockPlacementPolicy.LOG, Level.DEBUG);
-    GenericTestUtils.setLogLevel(BlockPlacementPolicyDefault.LOG, Level.DEBUG);
+    GenericTestUtils.setLogLevel(BlockPlacementPolicy.LOG, Level.TRACE);
+    GenericTestUtils.setLogLevel(BlockPlacementPolicyDefault.LOG, Level.TRACE);
     GenericTestUtils.setLogLevel(BlockPlacementPolicyRackFaultTolerant.LOG,
-        Level.DEBUG);
+        Level.TRACE);
     GenericTestUtils.setLogLevel(NetworkTopology.LOG, Level.DEBUG);
   }
 
@@ -62,20 +70,38 @@ public class TestErasureCodingMultipleRacks {
   private DistributedFileSystem dfs;
 
   @Before
-  public void setup() throws Exception {
+  public void setup() {
     ecPolicy = getPolicy();
-    final int dataUnits = ecPolicy.getNumDataUnits();
-    final int parityUnits = ecPolicy.getNumParityUnits();
-    final int numDatanodes = dataUnits + parityUnits;
-    final int numRacks = 2;
+    conf = new HdfsConfiguration();
+    // disable load consideration to test placement only.
+    conf.setBoolean(DFS_NAMENODE_REDUNDANCY_CONSIDERLOAD_KEY, false);
+  }
+
+  /**
+   * Setup cluster with desired number of DN, racks, and specified number of
+   * rack that only has 1 DN. Other racks will be evenly setup with the number
+   * of DNs.
+   * <p>
+   * This is not done as a {@link Before}, so test cases can setup differently.
+   *
+   * @param numDatanodes number of total Datanodes.
+   * @param numRacks number of total racks
+   * @param numSingleDnRacks number of racks that only has 1 DN
+   * @throws Exception
+   */
+  public void setupCluster(final int numDatanodes, final int numRacks,
+      final int numSingleDnRacks) throws Exception {
+    assert numDatanodes > numRacks;
+    assert numRacks > numSingleDnRacks;
+    assert numSingleDnRacks >= 0;
     final String[] racks = new String[numDatanodes];
-    for (int i = 0; i < numRacks; i++) {
+    for (int i = 0; i < numSingleDnRacks; i++) {
       racks[i] = "/rack" + i;
     }
-    for (int i = numRacks; i < numDatanodes; i++) {
-      racks[i] = "/rack" + (numRacks - 1);
+    for (int i = numSingleDnRacks; i < numDatanodes; i++) {
+      racks[i] =
+          "/rack" + (numSingleDnRacks + (i % (numRacks - numSingleDnRacks)));
     }
-    conf = new HdfsConfiguration();
     cluster = new MiniDFSCluster.Builder(conf)
         .numDataNodes(numDatanodes)
         .racks(racks)
@@ -92,16 +118,89 @@ public class TestErasureCodingMultipleRacks {
     }
   }
 
+  // Extreme case.
+  @Test
+  public void testSkewedRack1() throws Exception {
+    final int dataUnits = ecPolicy.getNumDataUnits();
+    final int parityUnits = ecPolicy.getNumParityUnits();
+    setupCluster(dataUnits + parityUnits, 2, 1);
+
+    final int filesize = ecPolicy.getNumDataUnits() * ecPolicy.getCellSize();
+    byte[] contents = new byte[filesize];
+
+    final Path path = new Path("/testfile");
+    LOG.info("Writing file " + path);
+    DFSTestUtil.writeFile(dfs, path, contents);
+    BlockLocation[] blocks = dfs.getFileBlockLocations(path, 0, 
Long.MAX_VALUE);
+    assertEquals(ecPolicy.getNumDataUnits() + ecPolicy.getNumParityUnits(),
+        blocks[0].getHosts().length);
+  }
+
+  // 1 rack has many nodes, other racks have single node. Extreme case.
+  @Test
+  public void testSkewedRack2() throws Exception {
+    final int dataUnits = ecPolicy.getNumDataUnits();
+    final int parityUnits = ecPolicy.getNumParityUnits();
+    setupCluster(dataUnits + parityUnits * 2, dataUnits, dataUnits - 1);
+
+    final int filesize = ecPolicy.getNumDataUnits() * ecPolicy.getCellSize();
+    byte[] contents = new byte[filesize];
+
+    final Path path = new Path("/testfile");
+    LOG.info("Writing file " + path);
+    DFSTestUtil.writeFile(dfs, path, contents);
+    BlockLocation[] blocks = dfs.getFileBlockLocations(path, 0, 
Long.MAX_VALUE);
+    assertEquals(ecPolicy.getNumDataUnits() + ecPolicy.getNumParityUnits(),
+        blocks[0].getHosts().length);
+  }
+
+  // 2 racks have sufficient nodes, other racks has 1. Should be able to
+  // tolerate 1 rack failure.
   @Test
-  public void testSkewedRack() throws Exception {
-    final int filesize = ecPolicy.getNumDataUnits() * ecPolicy
-        .getCellSize();
+  public void testSkewedRack3() throws Exception {
+    final int dataUnits = ecPolicy.getNumDataUnits();
+    final int parityUnits = ecPolicy.getNumParityUnits();
+    // Create enough extra DNs on the 2 racks to test even placement.
+    // Desired placement is parityUnits replicas on the 2 racks, and 1 replica
+    // on the rest of the racks (which only have 1 DN)
+    setupCluster(dataUnits + parityUnits * 4, dataUnits - parityUnits + 2,
+        dataUnits - parityUnits);
+
+    final int filesize = ecPolicy.getNumDataUnits() * ecPolicy.getCellSize();
     byte[] contents = new byte[filesize];
 
-    for (int i = 0; i < 10; i++) {
+    for (int i = 0; i < 10; ++i) {
       final Path path = new Path("/testfile" + i);
       LOG.info("Writing file " + path);
       DFSTestUtil.writeFile(dfs, path, contents);
+      BlockLocation[] blocks =
+          dfs.getFileBlockLocations(path, 0, Long.MAX_VALUE);
+      assertEquals(ecPolicy.getNumDataUnits() + ecPolicy.getNumParityUnits(),
+          blocks[0].getHosts().length);
+      assertRackFailureTolerated(blocks[0].getTopologyPaths());
+    }
+  }
+
+  // Verifies that no more than numParityUnits is placed on a rack.
+  private void assertRackFailureTolerated(final String[] topologies) {
+    final Map<String, Integer> racksCount = new HashMap<>();
+    for (String t : topologies) {
+      final Integer count = racksCount.get(getRackName(t));
+      if (count == null) {
+        racksCount.put(getRackName(t), 1);
+      } else {
+        racksCount.put(getRackName(t), count + 1);
+      }
     }
+    LOG.info("Rack count map is: {}", racksCount);
+
+    for (Integer count : racksCount.values()) {
+      assertTrue(count <= ecPolicy.getNumParityUnits());
+    }
+  }
+
+  private String getRackName(final String topology) {
+    assert topology.indexOf('/', 1) > 0;
+    return topology.substring(0, topology.indexOf('/', 1));
   }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org
For additional commands, e-mail: common-commits-h...@hadoop.apache.org

Reply via email to