Author: tgraves Date: Fri Jan 4 14:28:52 2013 New Revision: 1428883 URL: http://svn.apache.org/viewvc?rev=1428883&view=rev Log: HDFS-4270. Replications of the highest priority should be allowed to choose a source datanode that has reached its max replication limit (Derek Dagit via tgraves)
Modified: hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockManager.java Modified: hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt?rev=1428883&r1=1428882&r2=1428883&view=diff ============================================================================== --- hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt (original) +++ hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt Fri Jan 4 14:28:52 2013 @@ -29,6 +29,10 @@ Release 0.23.6 - UNRELEASED HDFS-4315. DNs with multiple BPs can have BPOfferServices fail to start due to unsynchronized map access. (atm via tgraves) + HDFS-4270. Replications of the highest priority should be allowed to + choose a source datanode that has reached its max replication limit + (Derek Dagit via tgraves) + Release 0.23.5 - UNRELEASED INCOMPATIBLE CHANGES Modified: hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java?rev=1428883&r1=1428882&r2=1428883&view=diff ============================================================================== --- hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java (original) +++ hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java Fri Jan 4 14:28:52 2013 @@ -118,6 +118,8 @@ public class DFSConfigKeys extends Commo public static final int DFS_NAMENODE_REPLICATION_PENDING_TIMEOUT_SEC_DEFAULT = -1; public static final String DFS_NAMENODE_REPLICATION_MAX_STREAMS_KEY = "dfs.namenode.replication.max-streams"; public static final int DFS_NAMENODE_REPLICATION_MAX_STREAMS_DEFAULT = 2; + public static final String DFS_NAMENODE_REPLICATION_STREAMS_HARD_LIMIT_KEY = "dfs.namenode.replication.max-streams-hard-limit"; + public static final int DFS_NAMENODE_REPLICATION_STREAMS_HARD_LIMIT_DEFAULT = 4; public static final String DFS_WEBHDFS_ENABLED_KEY = "dfs.webhdfs.enabled"; public static final boolean DFS_WEBHDFS_ENABLED_DEFAULT = false; public static final String DFS_PERMISSIONS_ENABLED_KEY = "dfs.permissions.enabled"; Modified: hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java?rev=1428883&r1=1428882&r2=1428883&view=diff ============================================================================== --- hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java (original) +++ hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java Fri Jan 4 14:28:52 2013 @@ -156,10 +156,16 @@ public class BlockManager { /** The maximum number of replicas allowed for a block */ public final short maxReplication; - /** The maximum number of outgoing replication streams - * a given node should have at one time + /** + * The maximum number of outgoing replication streams a given node should have + * at one time considering all but the highest priority replications needed. */ int maxReplicationStreams; + /** + * The maximum number of outgoing replication streams a given node should have + * at one time. + */ + int replicationStreamsHardLimit; /** Minimum copies needed or else write is disallowed */ public final short minReplication; /** Default number of replicas */ @@ -219,10 +225,16 @@ public class BlockManager { this.minReplication = (short)minR; this.maxReplication = (short)maxR; - this.maxReplicationStreams = conf.getInt(DFSConfigKeys.DFS_NAMENODE_REPLICATION_MAX_STREAMS_KEY, - DFSConfigKeys.DFS_NAMENODE_REPLICATION_MAX_STREAMS_DEFAULT); - this.shouldCheckForEnoughRacks = conf.get(DFSConfigKeys.NET_TOPOLOGY_SCRIPT_FILE_NAME_KEY) == null ? false - : true; + this.maxReplicationStreams = + conf.getInt(DFSConfigKeys.DFS_NAMENODE_REPLICATION_MAX_STREAMS_KEY, + DFSConfigKeys.DFS_NAMENODE_REPLICATION_MAX_STREAMS_DEFAULT); + this.replicationStreamsHardLimit = + conf.getInt( + DFSConfigKeys.DFS_NAMENODE_REPLICATION_STREAMS_HARD_LIMIT_KEY, + DFSConfigKeys.DFS_NAMENODE_REPLICATION_STREAMS_HARD_LIMIT_DEFAULT); + this.shouldCheckForEnoughRacks = + conf.get(DFSConfigKeys.NET_TOPOLOGY_SCRIPT_FILE_NAME_KEY) == null + ? false : true; this.replicationRecheckInterval = conf.getInt(DFSConfigKeys.DFS_NAMENODE_REPLICATION_INTERVAL_KEY, @@ -329,7 +341,8 @@ public class BlockManager { NumberReplicas numReplicas = new NumberReplicas(); // source node returned is not used chooseSourceDatanode(block, containingNodes, - containingLiveReplicasNodes, numReplicas); + containingLiveReplicasNodes, numReplicas, + UnderReplicatedBlocks.LEVEL); assert containingLiveReplicasNodes.size() == numReplicas.liveReplicas(); int usableReplicas = numReplicas.liveReplicas() + numReplicas.decommissionedReplicas(); @@ -1047,9 +1060,11 @@ public class BlockManager { liveReplicaNodes = new ArrayList<DatanodeDescriptor>(); NumberReplicas numReplicas = new NumberReplicas(); srcNode = chooseSourceDatanode( - block, containingNodes, liveReplicaNodes, numReplicas); + block, containingNodes, liveReplicaNodes, numReplicas, priority); if(srcNode == null) // block can not be replicated from any node + { return false; + } assert liveReplicaNodes.size() == numReplicas.liveReplicas(); // do not schedule more if enough replicas is already pending @@ -1212,16 +1227,34 @@ public class BlockManager { * since the former do not have write traffic and hence are less busy. * We do not use already decommissioned nodes as a source. * Otherwise we choose a random node among those that did not reach their - * replication limit. + * replication limits. However, if the replication is of the highest priority + * and all nodes have reached their replication limits, we will choose a + * random node despite the replication limit. * * In addition form a list of all nodes containing the block * and calculate its replication numbers. + * + * @param block Block for which a replication source is needed + * @param containingNodes List to be populated with nodes found to contain the + * given block + * @param nodesContainingLiveReplicas List to be populated with nodes found to + * contain live replicas of the given block + * @param numReplicas NumberReplicas instance to be initialized with the + * counts of live, corrupt, excess, and + * decommissioned replicas of the given + * block. + * @param priority integer representing replication priority of the given + * block + * @return the DatanodeDescriptor of the chosen node from which to replicate + * the given block */ - private DatanodeDescriptor chooseSourceDatanode( + @VisibleForTesting + DatanodeDescriptor chooseSourceDatanode( Block block, List<DatanodeDescriptor> containingNodes, List<DatanodeDescriptor> nodesContainingLiveReplicas, - NumberReplicas numReplicas) { + NumberReplicas numReplicas, + int priority) { containingNodes.clear(); nodesContainingLiveReplicas.clear(); DatanodeDescriptor srcNode = null; @@ -1250,8 +1283,15 @@ public class BlockManager { // If so, do not select the node as src node if ((nodesCorrupt != null) && nodesCorrupt.contains(node)) continue; - if(node.getNumberOfBlocksToBeReplicated() >= maxReplicationStreams) + if(priority != UnderReplicatedBlocks.QUEUE_HIGHEST_PRIORITY + && node.getNumberOfBlocksToBeReplicated() >= maxReplicationStreams) + { continue; // already reached replication limit + } + if (node.getNumberOfBlocksToBeReplicated() >= replicationStreamsHardLimit) + { + continue; + } // the block must not be scheduled for removal on srcNode if(excessBlocks != null && excessBlocks.contains(block)) continue; Modified: hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockManager.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockManager.java?rev=1428883&r1=1428882&r2=1428883&view=diff ============================================================================== --- hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockManager.java (original) +++ hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockManager.java Fri Jan 4 14:28:52 2013 @@ -20,6 +20,7 @@ package org.apache.hadoop.hdfs.server.bl import static org.junit.Assert.*; import java.io.IOException; +import java.util.LinkedList; import java.util.List; import java.util.Map.Entry; @@ -388,4 +389,57 @@ public class TestBlockManager { } return repls; } + + /** + * Test that a source node for a highest-priority replication is chosen even if all available + * source nodes have reached their replication limits. + */ + @Test + public void testHighestPriReplSrcChosenDespiteMaxReplLimit() throws Exception { + bm.maxReplicationStreams = 0; + bm.replicationStreamsHardLimit = 1; + + long blockId = 42; // arbitrary + Block aBlock = new Block(blockId, 0, 0); + + List<DatanodeDescriptor> origNodes = nodes(0, 1); + // Add the block to the first node. + addBlockOnNodes(blockId,origNodes.subList(0,1)); + + List<DatanodeDescriptor> cntNodes = new LinkedList<DatanodeDescriptor>(); + List<DatanodeDescriptor> liveNodes = new LinkedList<DatanodeDescriptor>(); + + assertNotNull("Chooses source node for a highest-priority replication" + + " even if all available source nodes have reached their replication" + + " limits below the hard limit.", + bm.chooseSourceDatanode( + aBlock, + cntNodes, + liveNodes, + new NumberReplicas(), + UnderReplicatedBlocks.QUEUE_HIGHEST_PRIORITY)); + + assertNull("Does not choose a source node for a less-than-highest-priority" + + " replication since all available source nodes have reached" + + " their replication limits.", + bm.chooseSourceDatanode( + aBlock, + cntNodes, + liveNodes, + new NumberReplicas(), + UnderReplicatedBlocks.QUEUE_VERY_UNDER_REPLICATED)); + + // Increase the replication count to test replication count > hard limit + DatanodeDescriptor targets[] = { origNodes.get(1) }; + origNodes.get(0).addBlockToBeReplicated(aBlock, targets); + + assertNull("Does not choose a source node for a highest-priority" + + " replication when all available nodes exceed the hard limit.", + bm.chooseSourceDatanode( + aBlock, + cntNodes, + liveNodes, + new NumberReplicas(), + UnderReplicatedBlocks.QUEUE_HIGHEST_PRIORITY)); + } }