Author: szetszwo Date: Sat May 3 11:02:44 2014 New Revision: 1592179 URL: http://svn.apache.org/r1592179 Log: HDFS-5168. Add cross node dependency support to BlockPlacementPolicy. Contributed by Nikola Vujic
Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/DatanodeInfo.java hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Balancer.java hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicy.java hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicyDefault.java hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicyWithNodeGroup.java hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/Host2NodesMap.java hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NamenodeFsck.java hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/DFSTestUtil.java hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManagerTestUtil.java hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestReplicationPolicyWithNodeGroup.java hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestFsck.java Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt?rev=1592179&r1=1592178&r2=1592179&view=diff ============================================================================== --- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt (original) +++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt Sat May 3 11:02:44 2014 @@ -267,6 +267,9 @@ Release 2.5.0 - UNRELEASED HDFS-6281. Provide option to use the NFS Gateway without having to use the Hadoop portmapper. (atm) + HDFS-5168. Add cross node dependency support to BlockPlacementPolicy. + (Nikola Vujic via szetszwo) + IMPROVEMENTS HDFS-6007. Update documentation about short-circuit local reads (iwasakims Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/DatanodeInfo.java URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/DatanodeInfo.java?rev=1592179&r1=1592178&r2=1592179&view=diff ============================================================================== --- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/DatanodeInfo.java (original) +++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/DatanodeInfo.java Sat May 3 11:02:44 2014 @@ -29,6 +29,8 @@ import org.apache.hadoop.util.StringUtil import org.apache.hadoop.util.Time; import java.util.Date; +import java.util.LinkedList; +import java.util.List; import static org.apache.hadoop.hdfs.DFSUtil.percent2String; @@ -50,6 +52,8 @@ public class DatanodeInfo extends Datano private int xceiverCount; private String location = NetworkTopology.DEFAULT_RACK; private String softwareVersion; + private List<String> dependentHostNames = new LinkedList<String>(); + // Datanode administrative states public enum AdminStates { @@ -274,6 +278,21 @@ public class DatanodeInfo extends Datano public synchronized void setNetworkLocation(String location) { this.location = NodeBase.normalize(location); } + + /** Add a hostname to a list of network dependencies */ + public void addDependentHostName(String hostname) { + dependentHostNames.add(hostname); + } + + /** List of Network dependencies */ + public List<String> getDependentHostNames() { + return dependentHostNames; + } + + /** Sets the network dependencies */ + public void setDependentHostNames(List<String> dependencyList) { + dependentHostNames = dependencyList; + } /** A formatted string for reporting the status of the DataNode. */ public String getDatanodeReport() { Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Balancer.java URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Balancer.java?rev=1592179&r1=1592178&r2=1592179&view=diff ============================================================================== --- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Balancer.java (original) +++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Balancer.java Sat May 3 11:02:44 2014 @@ -842,7 +842,7 @@ public class Balancer { */ private static void checkReplicationPolicyCompatibility(Configuration conf ) throws UnsupportedActionException { - if (!(BlockPlacementPolicy.getInstance(conf, null, null) instanceof + if (!(BlockPlacementPolicy.getInstance(conf, null, null, null) instanceof BlockPlacementPolicyDefault)) { throw new UnsupportedActionException( "Balancer without BlockPlacementPolicyDefault"); Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java?rev=1592179&r1=1592178&r2=1592179&view=diff ============================================================================== --- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java (original) +++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java Sat May 3 11:02:44 2014 @@ -267,7 +267,8 @@ public class BlockManager { blocksMap = new BlocksMap( LightWeightGSet.computeCapacity(2.0, "BlocksMap")); blockplacement = BlockPlacementPolicy.getInstance( - conf, stats, datanodeManager.getNetworkTopology()); + conf, stats, datanodeManager.getNetworkTopology(), + datanodeManager.getHost2DatanodeMap()); pendingReplications = new PendingReplicationBlocks(conf.getInt( DFSConfigKeys.DFS_NAMENODE_REPLICATION_PENDING_TIMEOUT_SEC_KEY, DFSConfigKeys.DFS_NAMENODE_REPLICATION_PENDING_TIMEOUT_SEC_DEFAULT) * 1000L); Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicy.java URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicy.java?rev=1592179&r1=1592178&r2=1592179&view=diff ============================================================================== --- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicy.java (original) +++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicy.java Sat May 3 11:02:44 2014 @@ -139,7 +139,8 @@ public abstract class BlockPlacementPoli * @param clusterMap cluster topology */ abstract protected void initialize(Configuration conf, FSClusterStats stats, - NetworkTopology clusterMap); + NetworkTopology clusterMap, + Host2NodesMap host2datanodeMap); /** * Get an instance of the configured Block Placement Policy based on the @@ -153,14 +154,15 @@ public abstract class BlockPlacementPoli */ public static BlockPlacementPolicy getInstance(Configuration conf, FSClusterStats stats, - NetworkTopology clusterMap) { + NetworkTopology clusterMap, + Host2NodesMap host2datanodeMap) { final Class<? extends BlockPlacementPolicy> replicatorClass = conf.getClass( DFSConfigKeys.DFS_BLOCK_REPLICATOR_CLASSNAME_KEY, DFSConfigKeys.DFS_BLOCK_REPLICATOR_CLASSNAME_DEFAULT, BlockPlacementPolicy.class); final BlockPlacementPolicy replicator = ReflectionUtils.newInstance( replicatorClass, conf); - replicator.initialize(conf, stats, clusterMap); + replicator.initialize(conf, stats, clusterMap, host2datanodeMap); return replicator; } Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicyDefault.java URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicyDefault.java?rev=1592179&r1=1592178&r2=1592179&view=diff ============================================================================== --- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicyDefault.java (original) +++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicyDefault.java Sat May 3 11:02:44 2014 @@ -70,6 +70,7 @@ public class BlockPlacementPolicyDefault protected boolean considerLoad; private boolean preferLocalNode = true; protected NetworkTopology clusterMap; + protected Host2NodesMap host2datanodeMap; private FSClusterStats stats; protected long heartbeatInterval; // interval for DataNode heartbeats private long staleInterval; // interval used to identify stale DataNodes @@ -80,8 +81,9 @@ public class BlockPlacementPolicyDefault protected int tolerateHeartbeatMultiplier; protected BlockPlacementPolicyDefault(Configuration conf, FSClusterStats stats, - NetworkTopology clusterMap) { - initialize(conf, stats, clusterMap); + NetworkTopology clusterMap, + Host2NodesMap host2datanodeMap) { + initialize(conf, stats, clusterMap, host2datanodeMap); } protected BlockPlacementPolicyDefault() { @@ -89,11 +91,13 @@ public class BlockPlacementPolicyDefault @Override public void initialize(Configuration conf, FSClusterStats stats, - NetworkTopology clusterMap) { + NetworkTopology clusterMap, + Host2NodesMap host2datanodeMap) { this.considerLoad = conf.getBoolean( DFSConfigKeys.DFS_NAMENODE_REPLICATION_CONSIDERLOAD_KEY, true); this.stats = stats; this.clusterMap = clusterMap; + this.host2datanodeMap = host2datanodeMap; this.heartbeatInterval = conf.getLong( DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_KEY, DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_DEFAULT) * 1000; Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicyWithNodeGroup.java URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicyWithNodeGroup.java?rev=1592179&r1=1592178&r2=1592179&view=diff ============================================================================== --- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicyWithNodeGroup.java (original) +++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicyWithNodeGroup.java Sat May 3 11:02:44 2014 @@ -47,8 +47,8 @@ import org.apache.hadoop.net.NodeBase; public class BlockPlacementPolicyWithNodeGroup extends BlockPlacementPolicyDefault { protected BlockPlacementPolicyWithNodeGroup(Configuration conf, FSClusterStats stats, - NetworkTopology clusterMap) { - initialize(conf, stats, clusterMap); + NetworkTopology clusterMap, DatanodeManager datanodeManager) { + initialize(conf, stats, clusterMap, host2datanodeMap); } protected BlockPlacementPolicyWithNodeGroup() { @@ -56,8 +56,9 @@ public class BlockPlacementPolicyWithNod @Override public void initialize(Configuration conf, FSClusterStats stats, - NetworkTopology clusterMap) { - super.initialize(conf, stats, clusterMap); + NetworkTopology clusterMap, + Host2NodesMap host2datanodeMap) { + super.initialize(conf, stats, clusterMap, host2datanodeMap); } /** choose local node of localMachine as the target. @@ -241,6 +242,36 @@ public class BlockPlacementPolicyWithNod countOfExcludedNodes++; } } + + countOfExcludedNodes += addDependentNodesToExcludedNodes( + chosenNode, excludedNodes); + return countOfExcludedNodes; + } + + /** + * Add all nodes from a dependent nodes list to excludedNodes. + * @return number of new excluded nodes + */ + private int addDependentNodesToExcludedNodes(DatanodeDescriptor chosenNode, + Set<Node> excludedNodes) { + if (this.host2datanodeMap == null) { + return 0; + } + int countOfExcludedNodes = 0; + for(String hostname : chosenNode.getDependentHostNames()) { + DatanodeDescriptor node = + this.host2datanodeMap.getDataNodeByHostName(hostname); + if(node!=null) { + if (excludedNodes.add(node)) { + countOfExcludedNodes++; + } + } else { + LOG.warn("Not able to find datanode " + hostname + + " which has dependency with datanode " + + chosenNode.getHostName()); + } + } + return countOfExcludedNodes; } Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java?rev=1592179&r1=1592178&r2=1592179&view=diff ============================================================================== --- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java (original) +++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java Sat May 3 11:02:44 2014 @@ -373,6 +373,11 @@ public class DatanodeManager { return host2DatanodeMap.getDatanodeByXferAddr(host, xferPort); } + /** @return the Host2NodesMap */ + public Host2NodesMap getHost2DatanodeMap() { + return this.host2DatanodeMap; + } + /** * Given datanode address or host name, returns the DatanodeDescriptor for the * same, or if it doesn't find the datanode, it looks for a machine local and @@ -678,6 +683,52 @@ public class DatanodeManager { } /** + * Resolve a node's dependencies in the network. If the DNS to switch + * mapping fails then this method returns empty list of dependencies + * @param node to get dependencies for + * @return List of dependent host names + */ + private List<String> getNetworkDependenciesWithDefault(DatanodeInfo node) { + List<String> dependencies; + try { + dependencies = getNetworkDependencies(node); + } catch (UnresolvedTopologyException e) { + LOG.error("Unresolved dependency mapping for host " + + node.getHostName() +". Continuing with an empty dependency list"); + dependencies = Collections.emptyList(); + } + return dependencies; + } + + /** + * Resolves a node's dependencies in the network. If the DNS to switch + * mapping fails to get dependencies, then this method throws + * UnresolvedTopologyException. + * @param node to get dependencies for + * @return List of dependent host names + * @throws UnresolvedTopologyException if the DNS to switch mapping fails + */ + private List<String> getNetworkDependencies(DatanodeInfo node) + throws UnresolvedTopologyException { + List<String> dependencies = Collections.emptyList(); + + if (dnsToSwitchMapping instanceof DNSToSwitchMappingWithDependency) { + //Get dependencies + dependencies = + ((DNSToSwitchMappingWithDependency)dnsToSwitchMapping).getDependency( + node.getHostName()); + if(dependencies == null) { + LOG.error("The dependency call returned null for host " + + node.getHostName()); + throw new UnresolvedTopologyException("The dependency call returned " + + "null for host " + node.getHostName()); + } + } + + return dependencies; + } + + /** * Remove an already decommissioned data node who is neither in include nor * exclude hosts lists from the the list of live or dead nodes. This is used * to not display an already decommssioned data node to the operators. @@ -869,12 +920,14 @@ public class DatanodeManager { nodeS.setDisallowed(false); // Node is in the include list // resolve network location - if(this.rejectUnresolvedTopologyDN) - { - nodeS.setNetworkLocation(resolveNetworkLocation(nodeS)); + if(this.rejectUnresolvedTopologyDN) { + nodeS.setNetworkLocation(resolveNetworkLocation(nodeS)); + nodeS.setDependentHostNames(getNetworkDependencies(nodeS)); } else { nodeS.setNetworkLocation( resolveNetworkLocationWithFallBackToDefaultLocation(nodeS)); + nodeS.setDependentHostNames( + getNetworkDependenciesWithDefault(nodeS)); } getNetworkTopology().add(nodeS); @@ -900,9 +953,12 @@ public class DatanodeManager { // resolve network location if(this.rejectUnresolvedTopologyDN) { nodeDescr.setNetworkLocation(resolveNetworkLocation(nodeDescr)); + nodeDescr.setDependentHostNames(getNetworkDependencies(nodeDescr)); } else { nodeDescr.setNetworkLocation( resolveNetworkLocationWithFallBackToDefaultLocation(nodeDescr)); + nodeDescr.setDependentHostNames( + getNetworkDependenciesWithDefault(nodeDescr)); } networktopology.add(nodeDescr); nodeDescr.setSoftwareVersion(nodeReg.getSoftwareVersion()); Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/Host2NodesMap.java URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/Host2NodesMap.java?rev=1592179&r1=1592178&r2=1592179&view=diff ============================================================================== --- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/Host2NodesMap.java (original) +++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/Host2NodesMap.java Sat May 3 11:02:44 2014 @@ -31,6 +31,7 @@ import org.apache.hadoop.hdfs.DFSUtil; @InterfaceAudience.Private @InterfaceStability.Evolving class Host2NodesMap { + private HashMap<String, String> mapHost = new HashMap<String, String>(); private final HashMap<String, DatanodeDescriptor[]> map = new HashMap<String, DatanodeDescriptor[]>(); private final ReadWriteLock hostmapLock = new ReentrantReadWriteLock(); @@ -69,6 +70,10 @@ class Host2NodesMap { } String ipAddr = node.getIpAddr(); + String hostname = node.getHostName(); + + mapHost.put(hostname, ipAddr); + DatanodeDescriptor[] nodes = map.get(ipAddr); DatanodeDescriptor[] newNodes; if (nodes==null) { @@ -95,6 +100,7 @@ class Host2NodesMap { } String ipAddr = node.getIpAddr(); + String hostname = node.getHostName(); hostmapLock.writeLock().lock(); try { @@ -105,6 +111,8 @@ class Host2NodesMap { if (nodes.length==1) { if (nodes[0]==node) { map.remove(ipAddr); + //remove hostname key since last datanode is removed + mapHost.remove(hostname); return true; } else { return false; @@ -188,12 +196,40 @@ class Host2NodesMap { } } + + + /** get a data node by its hostname. This should be used if only one + * datanode service is running on a hostname. If multiple datanodes + * are running on a hostname then use methods getDataNodeByXferAddr and + * getDataNodeByHostNameAndPort. + * @return DatanodeDescriptor if found; otherwise null. + */ + DatanodeDescriptor getDataNodeByHostName(String hostname) { + if(hostname == null) { + return null; + } + + hostmapLock.readLock().lock(); + try { + String ipAddr = mapHost.get(hostname); + if(ipAddr == null) { + return null; + } else { + return getDatanodeByHost(ipAddr); + } + } finally { + hostmapLock.readLock().unlock(); + } + } + @Override public String toString() { final StringBuilder b = new StringBuilder(getClass().getSimpleName()) .append("["); - for(Map.Entry<String, DatanodeDescriptor[]> e : map.entrySet()) { - b.append("\n " + e.getKey() + " => " + Arrays.asList(e.getValue())); + for(Map.Entry<String, String> host: mapHost.entrySet()) { + DatanodeDescriptor[] e = map.get(host.getValue()); + b.append("\n " + host.getKey() + " => "+host.getValue() + " => " + + Arrays.asList(e)); } return b.append("\n]").toString(); } Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NamenodeFsck.java URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NamenodeFsck.java?rev=1592179&r1=1592178&r2=1592179&view=diff ============================================================================== --- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NamenodeFsck.java (original) +++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NamenodeFsck.java Sat May 3 11:02:44 2014 @@ -172,8 +172,10 @@ public class NamenodeFsck { this.minReplication = minReplication; this.remoteAddress = remoteAddress; this.bpPolicy = BlockPlacementPolicy.getInstance(conf, null, - networktopology); - + networktopology, + namenode.getNamesystem().getBlockManager().getDatanodeManager() + .getHost2DatanodeMap()); + for (Iterator<String> it = pmap.keySet().iterator(); it.hasNext();) { String key = it.next(); if (key.equals("path")) { this.path = pmap.get("path")[0]; } Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/DFSTestUtil.java URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/DFSTestUtil.java?rev=1592179&r1=1592178&r2=1592179&view=diff ============================================================================== --- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/DFSTestUtil.java (original) +++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/DFSTestUtil.java Sat May 3 11:02:44 2014 @@ -897,29 +897,47 @@ public class DFSTestUtil { return getDatanodeDescriptor(ipAddr, DFSConfigKeys.DFS_DATANODE_DEFAULT_PORT, rackLocation); } + + public static DatanodeDescriptor getDatanodeDescriptor(String ipAddr, + String rackLocation, String hostname) { + return getDatanodeDescriptor(ipAddr, + DFSConfigKeys.DFS_DATANODE_DEFAULT_PORT, rackLocation, hostname); + } public static DatanodeStorageInfo createDatanodeStorageInfo( String storageID, String ip) { - return createDatanodeStorageInfo(storageID, ip, "defaultRack"); + return createDatanodeStorageInfo(storageID, ip, "defaultRack", "host"); } + public static DatanodeStorageInfo[] createDatanodeStorageInfos(String[] racks) { - return createDatanodeStorageInfos(racks.length, racks); + return createDatanodeStorageInfos(racks, null); + } + + public static DatanodeStorageInfo[] createDatanodeStorageInfos(String[] racks, String[] hostnames) { + return createDatanodeStorageInfos(racks.length, racks, hostnames); + } + + public static DatanodeStorageInfo[] createDatanodeStorageInfos(int n) { + return createDatanodeStorageInfos(n, null, null); } - public static DatanodeStorageInfo[] createDatanodeStorageInfos(int n, String... racks) { + + public static DatanodeStorageInfo[] createDatanodeStorageInfos( + int n, String[] racks, String[] hostnames) { DatanodeStorageInfo[] storages = new DatanodeStorageInfo[n]; for(int i = storages.length; i > 0; ) { final String storageID = "s" + i; final String ip = i + "." + i + "." + i + "." + i; i--; - final String rack = i < racks.length? racks[i]: "defaultRack"; - storages[i] = createDatanodeStorageInfo(storageID, ip, rack); + final String rack = (racks!=null && i < racks.length)? racks[i]: "defaultRack"; + final String hostname = (hostnames!=null && i < hostnames.length)? hostnames[i]: "host"; + storages[i] = createDatanodeStorageInfo(storageID, ip, rack, hostname); } return storages; } public static DatanodeStorageInfo createDatanodeStorageInfo( - String storageID, String ip, String rack) { + String storageID, String ip, String rack, String hostname) { final DatanodeStorage storage = new DatanodeStorage(storageID); - final DatanodeDescriptor dn = BlockManagerTestUtil.getDatanodeDescriptor(ip, rack, storage); + final DatanodeDescriptor dn = BlockManagerTestUtil.getDatanodeDescriptor(ip, rack, storage, hostname); return BlockManagerTestUtil.newDatanodeStorageInfo(dn, storage); } public static DatanodeDescriptor[] toDatanodeDescriptor( @@ -932,8 +950,8 @@ public class DFSTestUtil { } public static DatanodeDescriptor getDatanodeDescriptor(String ipAddr, - int port, String rackLocation) { - DatanodeID dnId = new DatanodeID(ipAddr, "host", + int port, String rackLocation, String hostname) { + DatanodeID dnId = new DatanodeID(ipAddr, hostname, UUID.randomUUID().toString(), port, DFSConfigKeys.DFS_DATANODE_HTTP_DEFAULT_PORT, DFSConfigKeys.DFS_DATANODE_HTTPS_DEFAULT_PORT, @@ -941,6 +959,11 @@ public class DFSTestUtil { return new DatanodeDescriptor(dnId, rackLocation); } + public static DatanodeDescriptor getDatanodeDescriptor(String ipAddr, + int port, String rackLocation) { + return getDatanodeDescriptor(ipAddr, port, rackLocation, "host"); + } + public static DatanodeRegistration getLocalDatanodeRegistration() { return new DatanodeRegistration(getLocalDatanodeID(), new StorageInfo( NodeType.DATA_NODE), new ExportedBlockKeys(), VersionInfo.getVersion()); Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManagerTestUtil.java URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManagerTestUtil.java?rev=1592179&r1=1592178&r2=1592179&view=diff ============================================================================== --- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManagerTestUtil.java (original) +++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManagerTestUtil.java Sat May 3 11:02:44 2014 @@ -236,8 +236,13 @@ public class BlockManagerTestUtil { public static DatanodeDescriptor getDatanodeDescriptor(String ipAddr, String rackLocation, DatanodeStorage storage) { + return getDatanodeDescriptor(ipAddr, rackLocation, storage, "host"); + } + + public static DatanodeDescriptor getDatanodeDescriptor(String ipAddr, + String rackLocation, DatanodeStorage storage, String hostname) { DatanodeDescriptor dn = DFSTestUtil.getDatanodeDescriptor(ipAddr, - DFSConfigKeys.DFS_DATANODE_DEFAULT_PORT, rackLocation); + DFSConfigKeys.DFS_DATANODE_DEFAULT_PORT, rackLocation, hostname); if (storage != null) { dn.updateStorage(storage); } Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestReplicationPolicyWithNodeGroup.java URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestReplicationPolicyWithNodeGroup.java?rev=1592179&r1=1592178&r2=1592179&view=diff ============================================================================== --- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestReplicationPolicyWithNodeGroup.java (original) +++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestReplicationPolicyWithNodeGroup.java Sat May 3 11:02:44 2014 @@ -47,11 +47,13 @@ import org.junit.After; import org.junit.Before; import org.junit.Test; + public class TestReplicationPolicyWithNodeGroup { private static final int BLOCK_SIZE = 1024; private static final int NUM_OF_DATANODES = 8; private static final int NUM_OF_DATANODES_BOUNDARY = 6; private static final int NUM_OF_DATANODES_MORE_TARGETS = 12; + private static final int NUM_OF_DATANODES_FOR_DEPENDENCIES = 6; private final Configuration CONF = new HdfsConfiguration(); private NetworkTopology cluster; private NameNode namenode; @@ -113,7 +115,33 @@ public class TestReplicationPolicyWithNo private final static DatanodeDescriptor NODE = new DatanodeDescriptor(DFSTestUtil.getDatanodeDescriptor("9.9.9.9", "/d2/r4/n7")); - + + private static final DatanodeStorageInfo[] storagesForDependencies; + private static final DatanodeDescriptor[] dataNodesForDependencies; + static { + final String[] racksForDependencies = { + "/d1/r1/n1", + "/d1/r1/n1", + "/d1/r1/n2", + "/d1/r1/n2", + "/d1/r1/n3", + "/d1/r1/n4" + }; + final String[] hostNamesForDependencies = { + "h1", + "h2", + "h3", + "h4", + "h5", + "h6" + }; + + storagesForDependencies = DFSTestUtil.createDatanodeStorageInfos( + racksForDependencies, hostNamesForDependencies); + dataNodesForDependencies = DFSTestUtil.toDatanodeDescriptor(storagesForDependencies); + + }; + @Before public void setUp() throws Exception { FileSystem.setDefaultUri(CONF, "hdfs://localhost:0"); @@ -720,5 +748,63 @@ public class TestReplicationPolicyWithNo assertEquals(targets.length, 6); } - + @Test + public void testChooseTargetWithDependencies() throws Exception { + for(int i=0; i<NUM_OF_DATANODES; i++) { + cluster.remove(dataNodes[i]); + } + + for(int i=0; i<NUM_OF_DATANODES_MORE_TARGETS; i++) { + DatanodeDescriptor node = dataNodesInMoreTargetsCase[i]; + if (cluster.contains(node)) { + cluster.remove(node); + } + } + + Host2NodesMap host2DatanodeMap = namenode.getNamesystem() + .getBlockManager() + .getDatanodeManager().getHost2DatanodeMap(); + for(int i=0; i<NUM_OF_DATANODES_FOR_DEPENDENCIES; i++) { + cluster.add(dataNodesForDependencies[i]); + host2DatanodeMap.add(dataNodesForDependencies[i]); + } + + //add dependencies (node1 <-> node2, and node3<->node4) + dataNodesForDependencies[1].addDependentHostName( + dataNodesForDependencies[2].getHostName()); + dataNodesForDependencies[2].addDependentHostName( + dataNodesForDependencies[1].getHostName()); + dataNodesForDependencies[3].addDependentHostName( + dataNodesForDependencies[4].getHostName()); + dataNodesForDependencies[4].addDependentHostName( + dataNodesForDependencies[3].getHostName()); + + //Update heartbeat + for(int i=0; i<NUM_OF_DATANODES_FOR_DEPENDENCIES; i++) { + updateHeartbeatWithUsage(dataNodesForDependencies[i], + 2*HdfsConstants.MIN_BLOCKS_FOR_WRITE*BLOCK_SIZE, 0L, + 2*HdfsConstants.MIN_BLOCKS_FOR_WRITE*BLOCK_SIZE, 0L, 0L, 0L, 0, 0); + } + + List<DatanodeStorageInfo> chosenNodes = new ArrayList<DatanodeStorageInfo>(); + + DatanodeStorageInfo[] targets; + Set<Node> excludedNodes = new HashSet<Node>(); + excludedNodes.add(dataNodesForDependencies[5]); + + //try to select three targets as there are three node groups + targets = chooseTarget(3, dataNodesForDependencies[1], chosenNodes, excludedNodes); + + //Even there are three node groups, verify that + //only two targets are selected due to dependencies + assertEquals(targets.length, 2); + assertEquals(targets[0], storagesForDependencies[1]); + assertTrue(targets[1].equals(storagesForDependencies[3]) || targets[1].equals(storagesForDependencies[4])); + + //verify that all data nodes are in the excluded list + assertEquals(excludedNodes.size(), NUM_OF_DATANODES_FOR_DEPENDENCIES); + for(int i=0; i<NUM_OF_DATANODES_FOR_DEPENDENCIES; i++) { + assertTrue(excludedNodes.contains(dataNodesForDependencies[i])); + } + } } Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestFsck.java URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestFsck.java?rev=1592179&r1=1592178&r2=1592179&view=diff ============================================================================== --- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestFsck.java (original) +++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestFsck.java Sat May 3 11:02:44 2014 @@ -68,6 +68,8 @@ import org.apache.hadoop.hdfs.protocol.E import org.apache.hadoop.hdfs.protocol.HdfsFileStatus; import org.apache.hadoop.hdfs.protocol.LocatedBlocks; import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfo; +import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager; +import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeManager; import org.apache.hadoop.hdfs.server.namenode.NamenodeFsck.Result; import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocols; import org.apache.hadoop.hdfs.tools.DFSck; @@ -981,10 +983,15 @@ public class TestFsck { PrintWriter out = new PrintWriter(result, true); InetAddress remoteAddress = InetAddress.getLocalHost(); FSNamesystem fsName = mock(FSNamesystem.class); + BlockManager blockManager = mock(BlockManager.class); + DatanodeManager dnManager = mock(DatanodeManager.class); + when(namenode.getNamesystem()).thenReturn(fsName); when(fsName.getBlockLocations(anyString(), anyLong(), anyLong(), anyBoolean(), anyBoolean(), anyBoolean())). thenThrow(new FileNotFoundException()) ; + when(fsName.getBlockManager()).thenReturn(blockManager); + when(blockManager.getDatanodeManager()).thenReturn(dnManager); NamenodeFsck fsck = new NamenodeFsck(conf, namenode, nettop, pmap, out, NUM_REPLICAS, (short)1, remoteAddress);