HDFS-12567. BlockPlacementPolicyRackFaultTolerant fails with racks with very few nodes.
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/644c2f69 Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/644c2f69 Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/644c2f69 Branch: refs/heads/YARN-5734 Commit: 644c2f6924f341f51d809c91dccfff88fc82f6f0 Parents: c071aad Author: Andrew Wang <[email protected]> Authored: Thu Oct 5 16:58:43 2017 -0700 Committer: Andrew Wang <[email protected]> Committed: Thu Oct 5 16:58:43 2017 -0700 ---------------------------------------------------------------------- .../blockmanagement/BlockPlacementPolicy.java | 2 +- .../BlockPlacementPolicyRackFaultTolerant.java | 49 +++++++-- .../hdfs/TestErasureCodingMultipleRacks.java | 107 +++++++++++++++++++ 3 files changed, 146 insertions(+), 12 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hadoop/blob/644c2f69/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicy.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicy.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicy.java index 732a2dc..23e3e40 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicy.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicy.java @@ -43,7 +43,7 @@ import org.slf4j.LoggerFactory; */ @InterfaceAudience.Private public abstract class BlockPlacementPolicy { - static final Logger LOG = LoggerFactory.getLogger( + public static final Logger LOG = LoggerFactory.getLogger( BlockPlacementPolicy.class); @InterfaceAudience.Private http://git-wip-us.apache.org/repos/asf/hadoop/blob/644c2f69/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicyRackFaultTolerant.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicyRackFaultTolerant.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicyRackFaultTolerant.java index c0d981c..1eac3ea 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicyRackFaultTolerant.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicyRackFaultTolerant.java @@ -46,9 +46,12 @@ public class BlockPlacementPolicyRackFaultTolerant extends BlockPlacementPolicyD if (numOfRacks == 1 || totalNumOfReplicas <= 1) { return new int[] {numOfReplicas, totalNumOfReplicas}; } - if(totalNumOfReplicas<numOfRacks){ + // If more racks than replicas, put one replica per rack. + if (totalNumOfReplicas < numOfRacks) { return new int[] {numOfReplicas, 1}; } + // If more replicas than racks, evenly spread the replicas. + // This calculation rounds up. int maxNodesPerRack = (totalNumOfReplicas - 1) / numOfRacks + 1; return new int[] {numOfReplicas, maxNodesPerRack}; } @@ -109,18 +112,42 @@ public class BlockPlacementPolicyRackFaultTolerant extends BlockPlacementPolicyD numOfReplicas = Math.min(totalReplicaExpected - results.size(), (maxNodesPerRack -1) * numOfRacks - (results.size() - excess)); - // Fill each rack exactly (maxNodesPerRack-1) replicas. - writer = chooseOnce(numOfReplicas, writer, new HashSet<>(excludedNodes), - blocksize, maxNodesPerRack -1, results, avoidStaleNodes, storageTypes); + try { + // Try to spread the replicas as evenly as possible across racks. + // This is done by first placing with (maxNodesPerRack-1), then spreading + // the remainder by calling again with maxNodesPerRack. + writer = chooseOnce(numOfReplicas, writer, new HashSet<>(excludedNodes), + blocksize, maxNodesPerRack - 1, results, avoidStaleNodes, + storageTypes); - for (DatanodeStorageInfo resultStorage : results) { - addToExcludedNodes(resultStorage.getDatanodeDescriptor(), excludedNodes); - } + // Exclude the chosen nodes + for (DatanodeStorageInfo resultStorage : results) { + addToExcludedNodes(resultStorage.getDatanodeDescriptor(), + excludedNodes); + } + LOG.trace("Chosen nodes: {}", results); + LOG.trace("Excluded nodes: {}", excludedNodes); - // For some racks, place one more replica to each one of them. - numOfReplicas = totalReplicaExpected - results.size(); - chooseOnce(numOfReplicas, writer, excludedNodes, blocksize, - maxNodesPerRack, results, avoidStaleNodes, storageTypes); + numOfReplicas = totalReplicaExpected - results.size(); + chooseOnce(numOfReplicas, writer, excludedNodes, blocksize, + maxNodesPerRack, results, avoidStaleNodes, storageTypes); + } catch (NotEnoughReplicasException e) { + LOG.debug("Only able to place {} of {} (maxNodesPerRack={}) nodes " + + "evenly across racks, falling back to uneven placement.", + results.size(), numOfReplicas, maxNodesPerRack); + LOG.debug("Caught exception was:", e); + // Exclude the chosen nodes + for (DatanodeStorageInfo resultStorage : results) { + addToExcludedNodes(resultStorage.getDatanodeDescriptor(), + excludedNodes); + } + + LOG.trace("Chosen nodes: {}", results); + LOG.trace("Excluded nodes: {}", excludedNodes); + numOfReplicas = totalReplicaExpected - results.size(); + chooseOnce(numOfReplicas, writer, excludedNodes, blocksize, + totalReplicaExpected, results, avoidStaleNodes, storageTypes); + } return writer; } http://git-wip-us.apache.org/repos/asf/hadoop/blob/644c2f69/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestErasureCodingMultipleRacks.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestErasureCodingMultipleRacks.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestErasureCodingMultipleRacks.java new file mode 100644 index 0000000..eb6213a --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestErasureCodingMultipleRacks.java @@ -0,0 +1,107 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hdfs.protocol.ErasureCodingPolicy; +import org.apache.hadoop.hdfs.server.blockmanagement.BlockPlacementPolicy; +import org.apache.hadoop.hdfs.server.blockmanagement.BlockPlacementPolicyDefault; +import org.apache.hadoop.hdfs.server.blockmanagement.BlockPlacementPolicyRackFaultTolerant; +import org.apache.hadoop.net.NetworkTopology; +import org.apache.hadoop.test.GenericTestUtils; +import org.junit.After; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.Timeout; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.slf4j.event.Level; + +/** + * Test erasure coding block placement with skewed # nodes per rack. + */ +public class TestErasureCodingMultipleRacks { + public static final Logger LOG = + LoggerFactory.getLogger(TestErasureCodingMultipleRacks.class); + + static { + GenericTestUtils.setLogLevel(BlockPlacementPolicy.LOG, Level.DEBUG); + GenericTestUtils.setLogLevel(BlockPlacementPolicyDefault.LOG, Level.DEBUG); + GenericTestUtils.setLogLevel(BlockPlacementPolicyRackFaultTolerant.LOG, + Level.DEBUG); + GenericTestUtils.setLogLevel(NetworkTopology.LOG, Level.DEBUG); + } + + @Rule + public Timeout globalTimeout = new Timeout(300000); + + public ErasureCodingPolicy getPolicy() { + return StripedFileTestUtil.getDefaultECPolicy(); + } + + private MiniDFSCluster cluster; + private ErasureCodingPolicy ecPolicy; + private Configuration conf; + private DistributedFileSystem dfs; + + @Before + public void setup() throws Exception { + ecPolicy = getPolicy(); + final int dataUnits = ecPolicy.getNumDataUnits(); + final int parityUnits = ecPolicy.getNumParityUnits(); + final int numDatanodes = dataUnits + parityUnits; + final int numRacks = 2; + final String[] racks = new String[numDatanodes]; + for (int i = 0; i < numRacks; i++) { + racks[i] = "/rack" + i; + } + for (int i = numRacks; i < numDatanodes; i++) { + racks[i] = "/rack" + (numRacks - 1); + } + conf = new HdfsConfiguration(); + cluster = new MiniDFSCluster.Builder(conf) + .numDataNodes(numDatanodes) + .racks(racks) + .build(); + dfs = cluster.getFileSystem(); + cluster.waitActive(); + dfs.setErasureCodingPolicy(new Path("/"), ecPolicy.getName()); + } + + @After + public void teardown() throws Exception { + if (cluster != null) { + cluster.shutdown(); + } + } + + @Test + public void testSkewedRack() throws Exception { + final int filesize = ecPolicy.getNumDataUnits() * ecPolicy + .getCellSize(); + byte[] contents = new byte[filesize]; + + for (int i = 0; i < 10; i++) { + final Path path = new Path("/testfile" + i); + LOG.info("Writing file " + path); + DFSTestUtil.writeFile(dfs, path, contents); + } + } +} --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
