Author: szetszwo
Date: Sat May  3 11:02:44 2014
New Revision: 1592179

URL: http://svn.apache.org/r1592179
Log:
HDFS-5168. Add cross node dependency support to BlockPlacementPolicy.  
Contributed by Nikola Vujic

Modified:
    hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
    
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/DatanodeInfo.java
    
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Balancer.java
    
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java
    
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicy.java
    
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicyDefault.java
    
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicyWithNodeGroup.java
    
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java
    
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/Host2NodesMap.java
    
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NamenodeFsck.java
    
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/DFSTestUtil.java
    
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManagerTestUtil.java
    
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestReplicationPolicyWithNodeGroup.java
    
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestFsck.java

Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
URL: 
http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt?rev=1592179&r1=1592178&r2=1592179&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt Sat May  3 
11:02:44 2014
@@ -267,6 +267,9 @@ Release 2.5.0 - UNRELEASED
     HDFS-6281. Provide option to use the NFS Gateway without having to use the
     Hadoop portmapper. (atm)
 
+    HDFS-5168. Add cross node dependency support to BlockPlacementPolicy.
+    (Nikola Vujic via szetszwo)
+
   IMPROVEMENTS
 
     HDFS-6007. Update documentation about short-circuit local reads (iwasakims

Modified: 
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/DatanodeInfo.java
URL: 
http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/DatanodeInfo.java?rev=1592179&r1=1592178&r2=1592179&view=diff
==============================================================================
--- 
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/DatanodeInfo.java
 (original)
+++ 
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/DatanodeInfo.java
 Sat May  3 11:02:44 2014
@@ -29,6 +29,8 @@ import org.apache.hadoop.util.StringUtil
 import org.apache.hadoop.util.Time;
 
 import java.util.Date;
+import java.util.LinkedList;
+import java.util.List;
 
 import static org.apache.hadoop.hdfs.DFSUtil.percent2String;
 
@@ -50,6 +52,8 @@ public class DatanodeInfo extends Datano
   private int xceiverCount;
   private String location = NetworkTopology.DEFAULT_RACK;
   private String softwareVersion;
+  private List<String> dependentHostNames = new LinkedList<String>();
+  
   
   // Datanode administrative states
   public enum AdminStates {
@@ -274,6 +278,21 @@ public class DatanodeInfo extends Datano
   public synchronized void setNetworkLocation(String location) {
     this.location = NodeBase.normalize(location);
   }
+  
+  /** Add a hostname to a list of network dependencies */
+  public void addDependentHostName(String hostname) {
+    dependentHostNames.add(hostname);
+  }
+  
+  /** List of Network dependencies */
+  public List<String> getDependentHostNames() {
+    return dependentHostNames;
+  }
+  
+  /** Sets the network dependencies */
+  public void setDependentHostNames(List<String> dependencyList) {
+    dependentHostNames = dependencyList;
+  }
     
   /** A formatted string for reporting the status of the DataNode. */
   public String getDatanodeReport() {

Modified: 
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Balancer.java
URL: 
http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Balancer.java?rev=1592179&r1=1592178&r2=1592179&view=diff
==============================================================================
--- 
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Balancer.java
 (original)
+++ 
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Balancer.java
 Sat May  3 11:02:44 2014
@@ -842,7 +842,7 @@ public class Balancer {
    */
   private static void checkReplicationPolicyCompatibility(Configuration conf
       ) throws UnsupportedActionException {
-    if (!(BlockPlacementPolicy.getInstance(conf, null, null) instanceof 
+    if (!(BlockPlacementPolicy.getInstance(conf, null, null, null) instanceof 
         BlockPlacementPolicyDefault)) {
       throw new UnsupportedActionException(
           "Balancer without BlockPlacementPolicyDefault");

Modified: 
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java
URL: 
http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java?rev=1592179&r1=1592178&r2=1592179&view=diff
==============================================================================
--- 
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java
 (original)
+++ 
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java
 Sat May  3 11:02:44 2014
@@ -267,7 +267,8 @@ public class BlockManager {
     blocksMap = new BlocksMap(
         LightWeightGSet.computeCapacity(2.0, "BlocksMap"));
     blockplacement = BlockPlacementPolicy.getInstance(
-        conf, stats, datanodeManager.getNetworkTopology());
+        conf, stats, datanodeManager.getNetworkTopology(), 
+        datanodeManager.getHost2DatanodeMap());
     pendingReplications = new PendingReplicationBlocks(conf.getInt(
       DFSConfigKeys.DFS_NAMENODE_REPLICATION_PENDING_TIMEOUT_SEC_KEY,
       DFSConfigKeys.DFS_NAMENODE_REPLICATION_PENDING_TIMEOUT_SEC_DEFAULT) * 
1000L);

Modified: 
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicy.java
URL: 
http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicy.java?rev=1592179&r1=1592178&r2=1592179&view=diff
==============================================================================
--- 
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicy.java
 (original)
+++ 
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicy.java
 Sat May  3 11:02:44 2014
@@ -139,7 +139,8 @@ public abstract class BlockPlacementPoli
    * @param clusterMap cluster topology
    */
   abstract protected void initialize(Configuration conf,  FSClusterStats 
stats, 
-                                     NetworkTopology clusterMap);
+                                     NetworkTopology clusterMap, 
+                                     Host2NodesMap host2datanodeMap);
     
   /**
    * Get an instance of the configured Block Placement Policy based on the
@@ -153,14 +154,15 @@ public abstract class BlockPlacementPoli
    */
   public static BlockPlacementPolicy getInstance(Configuration conf, 
                                                  FSClusterStats stats,
-                                                 NetworkTopology clusterMap) {
+                                                 NetworkTopology clusterMap,
+                                                 Host2NodesMap 
host2datanodeMap) {
     final Class<? extends BlockPlacementPolicy> replicatorClass = 
conf.getClass(
         DFSConfigKeys.DFS_BLOCK_REPLICATOR_CLASSNAME_KEY,
         DFSConfigKeys.DFS_BLOCK_REPLICATOR_CLASSNAME_DEFAULT,
         BlockPlacementPolicy.class);
     final BlockPlacementPolicy replicator = ReflectionUtils.newInstance(
         replicatorClass, conf);
-    replicator.initialize(conf, stats, clusterMap);
+    replicator.initialize(conf, stats, clusterMap, host2datanodeMap);
     return replicator;
   }
   

Modified: 
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicyDefault.java
URL: 
http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicyDefault.java?rev=1592179&r1=1592178&r2=1592179&view=diff
==============================================================================
--- 
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicyDefault.java
 (original)
+++ 
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicyDefault.java
 Sat May  3 11:02:44 2014
@@ -70,6 +70,7 @@ public class BlockPlacementPolicyDefault
   protected boolean considerLoad; 
   private boolean preferLocalNode = true;
   protected NetworkTopology clusterMap;
+  protected Host2NodesMap host2datanodeMap;
   private FSClusterStats stats;
   protected long heartbeatInterval;   // interval for DataNode heartbeats
   private long staleInterval;   // interval used to identify stale DataNodes
@@ -80,8 +81,9 @@ public class BlockPlacementPolicyDefault
   protected int tolerateHeartbeatMultiplier;
 
   protected BlockPlacementPolicyDefault(Configuration conf, FSClusterStats 
stats,
-                           NetworkTopology clusterMap) {
-    initialize(conf, stats, clusterMap);
+                           NetworkTopology clusterMap, 
+                           Host2NodesMap host2datanodeMap) {
+    initialize(conf, stats, clusterMap, host2datanodeMap);
   }
 
   protected BlockPlacementPolicyDefault() {
@@ -89,11 +91,13 @@ public class BlockPlacementPolicyDefault
     
   @Override
   public void initialize(Configuration conf,  FSClusterStats stats,
-                         NetworkTopology clusterMap) {
+                         NetworkTopology clusterMap, 
+                         Host2NodesMap host2datanodeMap) {
     this.considerLoad = conf.getBoolean(
         DFSConfigKeys.DFS_NAMENODE_REPLICATION_CONSIDERLOAD_KEY, true);
     this.stats = stats;
     this.clusterMap = clusterMap;
+    this.host2datanodeMap = host2datanodeMap;
     this.heartbeatInterval = conf.getLong(
         DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_KEY,
         DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_DEFAULT) * 1000;

Modified: 
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicyWithNodeGroup.java
URL: 
http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicyWithNodeGroup.java?rev=1592179&r1=1592178&r2=1592179&view=diff
==============================================================================
--- 
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicyWithNodeGroup.java
 (original)
+++ 
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicyWithNodeGroup.java
 Sat May  3 11:02:44 2014
@@ -47,8 +47,8 @@ import org.apache.hadoop.net.NodeBase;
 public class BlockPlacementPolicyWithNodeGroup extends 
BlockPlacementPolicyDefault {
 
   protected BlockPlacementPolicyWithNodeGroup(Configuration conf,  
FSClusterStats stats,
-      NetworkTopology clusterMap) {
-    initialize(conf, stats, clusterMap);
+      NetworkTopology clusterMap, DatanodeManager datanodeManager) {
+    initialize(conf, stats, clusterMap, host2datanodeMap);
   }
 
   protected BlockPlacementPolicyWithNodeGroup() {
@@ -56,8 +56,9 @@ public class BlockPlacementPolicyWithNod
 
   @Override
   public void initialize(Configuration conf,  FSClusterStats stats,
-          NetworkTopology clusterMap) {
-    super.initialize(conf, stats, clusterMap);
+          NetworkTopology clusterMap, 
+          Host2NodesMap host2datanodeMap) {
+    super.initialize(conf, stats, clusterMap, host2datanodeMap);
   }
 
   /** choose local node of localMachine as the target.
@@ -241,6 +242,36 @@ public class BlockPlacementPolicyWithNod
         countOfExcludedNodes++;
       }
     }
+    
+    countOfExcludedNodes += addDependentNodesToExcludedNodes(
+        chosenNode, excludedNodes);
+    return countOfExcludedNodes;
+  }
+  
+  /**
+   * Add all nodes from a dependent nodes list to excludedNodes.
+   * @return number of new excluded nodes
+   */
+  private int addDependentNodesToExcludedNodes(DatanodeDescriptor chosenNode,
+      Set<Node> excludedNodes) {
+    if (this.host2datanodeMap == null) {
+      return 0;
+    }
+    int countOfExcludedNodes = 0;
+    for(String hostname : chosenNode.getDependentHostNames()) {
+      DatanodeDescriptor node =
+          this.host2datanodeMap.getDataNodeByHostName(hostname);
+      if(node!=null) {
+        if (excludedNodes.add(node)) {
+          countOfExcludedNodes++;
+        }
+      } else {
+        LOG.warn("Not able to find datanode " + hostname
+            + " which has dependency with datanode "
+            + chosenNode.getHostName());
+      }
+    }
+    
     return countOfExcludedNodes;
   }
 

Modified: 
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java
URL: 
http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java?rev=1592179&r1=1592178&r2=1592179&view=diff
==============================================================================
--- 
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java
 (original)
+++ 
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java
 Sat May  3 11:02:44 2014
@@ -373,6 +373,11 @@ public class DatanodeManager {
     return host2DatanodeMap.getDatanodeByXferAddr(host, xferPort);
   }
 
+  /** @return the Host2NodesMap */
+  public Host2NodesMap getHost2DatanodeMap() {
+    return this.host2DatanodeMap;
+  }
+
   /**
    * Given datanode address or host name, returns the DatanodeDescriptor for 
the
    * same, or if it doesn't find the datanode, it looks for a machine local and
@@ -678,6 +683,52 @@ public class DatanodeManager {
   }
 
   /**
+   * Resolve a node's dependencies in the network. If the DNS to switch 
+   * mapping fails then this method returns empty list of dependencies 
+   * @param node to get dependencies for
+   * @return List of dependent host names
+   */
+  private List<String> getNetworkDependenciesWithDefault(DatanodeInfo node) {
+    List<String> dependencies;
+    try {
+      dependencies = getNetworkDependencies(node);
+    } catch (UnresolvedTopologyException e) {
+      LOG.error("Unresolved dependency mapping for host " + 
+          node.getHostName() +". Continuing with an empty dependency list");
+      dependencies = Collections.emptyList();
+    }
+    return dependencies;
+  }
+  
+  /**
+   * Resolves a node's dependencies in the network. If the DNS to switch 
+   * mapping fails to get dependencies, then this method throws 
+   * UnresolvedTopologyException. 
+   * @param node to get dependencies for
+   * @return List of dependent host names 
+   * @throws UnresolvedTopologyException if the DNS to switch mapping fails
+   */
+  private List<String> getNetworkDependencies(DatanodeInfo node)
+      throws UnresolvedTopologyException {
+    List<String> dependencies = Collections.emptyList();
+
+    if (dnsToSwitchMapping instanceof DNSToSwitchMappingWithDependency) {
+      //Get dependencies
+      dependencies = 
+          ((DNSToSwitchMappingWithDependency)dnsToSwitchMapping).getDependency(
+              node.getHostName());
+      if(dependencies == null) {
+        LOG.error("The dependency call returned null for host " + 
+            node.getHostName());
+        throw new UnresolvedTopologyException("The dependency call returned " 
+ 
+            "null for host " + node.getHostName());
+      }
+    }
+
+    return dependencies;
+  }
+
+  /**
    * Remove an already decommissioned data node who is neither in include nor
    * exclude hosts lists from the the list of live or dead nodes.  This is used
    * to not display an already decommssioned data node to the operators.
@@ -869,12 +920,14 @@ public class DatanodeManager {
           nodeS.setDisallowed(false); // Node is in the include list
 
           // resolve network location
-          if(this.rejectUnresolvedTopologyDN)
-          {
-            nodeS.setNetworkLocation(resolveNetworkLocation(nodeS));  
+          if(this.rejectUnresolvedTopologyDN) {
+            nodeS.setNetworkLocation(resolveNetworkLocation(nodeS));
+            nodeS.setDependentHostNames(getNetworkDependencies(nodeS));
           } else {
             nodeS.setNetworkLocation(
                 resolveNetworkLocationWithFallBackToDefaultLocation(nodeS));
+            nodeS.setDependentHostNames(
+                getNetworkDependenciesWithDefault(nodeS));
           }
           getNetworkTopology().add(nodeS);
             
@@ -900,9 +953,12 @@ public class DatanodeManager {
         // resolve network location
         if(this.rejectUnresolvedTopologyDN) {
           nodeDescr.setNetworkLocation(resolveNetworkLocation(nodeDescr));
+          nodeDescr.setDependentHostNames(getNetworkDependencies(nodeDescr));
         } else {
           nodeDescr.setNetworkLocation(
               resolveNetworkLocationWithFallBackToDefaultLocation(nodeDescr));
+          nodeDescr.setDependentHostNames(
+              getNetworkDependenciesWithDefault(nodeDescr));
         }
         networktopology.add(nodeDescr);
         nodeDescr.setSoftwareVersion(nodeReg.getSoftwareVersion());

Modified: 
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/Host2NodesMap.java
URL: 
http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/Host2NodesMap.java?rev=1592179&r1=1592178&r2=1592179&view=diff
==============================================================================
--- 
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/Host2NodesMap.java
 (original)
+++ 
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/Host2NodesMap.java
 Sat May  3 11:02:44 2014
@@ -31,6 +31,7 @@ import org.apache.hadoop.hdfs.DFSUtil;
 @InterfaceAudience.Private
 @InterfaceStability.Evolving
 class Host2NodesMap {
+  private HashMap<String, String> mapHost = new HashMap<String, String>();
   private final HashMap<String, DatanodeDescriptor[]> map
     = new HashMap<String, DatanodeDescriptor[]>();
   private final ReadWriteLock hostmapLock = new ReentrantReadWriteLock();
@@ -69,6 +70,10 @@ class Host2NodesMap {
       }
       
       String ipAddr = node.getIpAddr();
+      String hostname = node.getHostName();
+      
+      mapHost.put(hostname, ipAddr);
+      
       DatanodeDescriptor[] nodes = map.get(ipAddr);
       DatanodeDescriptor[] newNodes;
       if (nodes==null) {
@@ -95,6 +100,7 @@ class Host2NodesMap {
     }
       
     String ipAddr = node.getIpAddr();
+    String hostname = node.getHostName();
     hostmapLock.writeLock().lock();
     try {
 
@@ -105,6 +111,8 @@ class Host2NodesMap {
       if (nodes.length==1) {
         if (nodes[0]==node) {
           map.remove(ipAddr);
+          //remove hostname key since last datanode is removed
+          mapHost.remove(hostname);
           return true;
         } else {
           return false;
@@ -188,12 +196,40 @@ class Host2NodesMap {
     }
   }
 
+  
+
+  /** get a data node by its hostname. This should be used if only one 
+   * datanode service is running on a hostname. If multiple datanodes
+   * are running on a hostname then use methods getDataNodeByXferAddr and
+   * getDataNodeByHostNameAndPort.
+   * @return DatanodeDescriptor if found; otherwise null.
+   */
+  DatanodeDescriptor getDataNodeByHostName(String hostname) {
+    if(hostname == null) {
+      return null;
+    }
+    
+    hostmapLock.readLock().lock();
+    try {
+      String ipAddr = mapHost.get(hostname);
+      if(ipAddr == null) {
+        return null;
+      } else {  
+        return getDatanodeByHost(ipAddr);
+      }
+    } finally {
+      hostmapLock.readLock().unlock();
+    }
+  }
+
   @Override
   public String toString() {
     final StringBuilder b = new StringBuilder(getClass().getSimpleName())
         .append("[");
-    for(Map.Entry<String, DatanodeDescriptor[]> e : map.entrySet()) {
-      b.append("\n  " + e.getKey() + " => " + Arrays.asList(e.getValue()));
+    for(Map.Entry<String, String> host: mapHost.entrySet()) {
+      DatanodeDescriptor[] e = map.get(host.getValue());
+      b.append("\n  " + host.getKey() + " => "+host.getValue() + " => " 
+          + Arrays.asList(e));
     }
     return b.append("\n]").toString();
   }

Modified: 
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NamenodeFsck.java
URL: 
http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NamenodeFsck.java?rev=1592179&r1=1592178&r2=1592179&view=diff
==============================================================================
--- 
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NamenodeFsck.java
 (original)
+++ 
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NamenodeFsck.java
 Sat May  3 11:02:44 2014
@@ -172,8 +172,10 @@ public class NamenodeFsck {
     this.minReplication = minReplication;
     this.remoteAddress = remoteAddress;
     this.bpPolicy = BlockPlacementPolicy.getInstance(conf, null,
-        networktopology);
-
+        networktopology,
+        namenode.getNamesystem().getBlockManager().getDatanodeManager()
+        .getHost2DatanodeMap());
+    
     for (Iterator<String> it = pmap.keySet().iterator(); it.hasNext();) {
       String key = it.next();
       if (key.equals("path")) { this.path = pmap.get("path")[0]; }

Modified: 
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/DFSTestUtil.java
URL: 
http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/DFSTestUtil.java?rev=1592179&r1=1592178&r2=1592179&view=diff
==============================================================================
--- 
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/DFSTestUtil.java
 (original)
+++ 
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/DFSTestUtil.java
 Sat May  3 11:02:44 2014
@@ -897,29 +897,47 @@ public class DFSTestUtil {
     return getDatanodeDescriptor(ipAddr, 
DFSConfigKeys.DFS_DATANODE_DEFAULT_PORT,
         rackLocation);
   }
+  
+  public static DatanodeDescriptor getDatanodeDescriptor(String ipAddr,
+      String rackLocation, String hostname) {
+    return getDatanodeDescriptor(ipAddr, 
+        DFSConfigKeys.DFS_DATANODE_DEFAULT_PORT, rackLocation, hostname);
+  }
 
   public static DatanodeStorageInfo createDatanodeStorageInfo(
       String storageID, String ip) {
-    return createDatanodeStorageInfo(storageID, ip, "defaultRack");
+    return createDatanodeStorageInfo(storageID, ip, "defaultRack", "host");
   }
+  
   public static DatanodeStorageInfo[] createDatanodeStorageInfos(String[] 
racks) {
-    return createDatanodeStorageInfos(racks.length, racks);
+    return createDatanodeStorageInfos(racks, null);
+  }
+  
+  public static DatanodeStorageInfo[] createDatanodeStorageInfos(String[] 
racks, String[] hostnames) {
+    return createDatanodeStorageInfos(racks.length, racks, hostnames);
+  }
+  
+  public static DatanodeStorageInfo[] createDatanodeStorageInfos(int n) {
+    return createDatanodeStorageInfos(n, null, null);
   }
-  public static DatanodeStorageInfo[] createDatanodeStorageInfos(int n, 
String... racks) {
+    
+  public static DatanodeStorageInfo[] createDatanodeStorageInfos(
+      int n, String[] racks, String[] hostnames) {
     DatanodeStorageInfo[] storages = new DatanodeStorageInfo[n];
     for(int i = storages.length; i > 0; ) {
       final String storageID = "s" + i;
       final String ip = i + "." + i + "." + i + "." + i;
       i--;
-      final String rack = i < racks.length? racks[i]: "defaultRack";
-      storages[i] = createDatanodeStorageInfo(storageID, ip, rack);
+      final String rack = (racks!=null && i < racks.length)? racks[i]: 
"defaultRack";
+      final String hostname = (hostnames!=null && i < hostnames.length)? 
hostnames[i]: "host";
+      storages[i] = createDatanodeStorageInfo(storageID, ip, rack, hostname);
     }
     return storages;
   }
   public static DatanodeStorageInfo createDatanodeStorageInfo(
-      String storageID, String ip, String rack) {
+      String storageID, String ip, String rack, String hostname) {
     final DatanodeStorage storage = new DatanodeStorage(storageID);
-    final DatanodeDescriptor dn = 
BlockManagerTestUtil.getDatanodeDescriptor(ip, rack, storage);
+    final DatanodeDescriptor dn = 
BlockManagerTestUtil.getDatanodeDescriptor(ip, rack, storage, hostname);
     return BlockManagerTestUtil.newDatanodeStorageInfo(dn, storage);
   }
   public static DatanodeDescriptor[] toDatanodeDescriptor(
@@ -932,8 +950,8 @@ public class DFSTestUtil {
   }
 
   public static DatanodeDescriptor getDatanodeDescriptor(String ipAddr,
-      int port, String rackLocation) {
-    DatanodeID dnId = new DatanodeID(ipAddr, "host",
+      int port, String rackLocation, String hostname) {
+    DatanodeID dnId = new DatanodeID(ipAddr, hostname,
         UUID.randomUUID().toString(), port,
         DFSConfigKeys.DFS_DATANODE_HTTP_DEFAULT_PORT,
         DFSConfigKeys.DFS_DATANODE_HTTPS_DEFAULT_PORT,
@@ -941,6 +959,11 @@ public class DFSTestUtil {
     return new DatanodeDescriptor(dnId, rackLocation);
   }
   
+  public static DatanodeDescriptor getDatanodeDescriptor(String ipAddr,
+      int port, String rackLocation) {
+    return getDatanodeDescriptor(ipAddr, port, rackLocation, "host");
+  }
+  
   public static DatanodeRegistration getLocalDatanodeRegistration() {
     return new DatanodeRegistration(getLocalDatanodeID(), new StorageInfo(
         NodeType.DATA_NODE), new ExportedBlockKeys(), 
VersionInfo.getVersion());

Modified: 
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManagerTestUtil.java
URL: 
http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManagerTestUtil.java?rev=1592179&r1=1592178&r2=1592179&view=diff
==============================================================================
--- 
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManagerTestUtil.java
 (original)
+++ 
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManagerTestUtil.java
 Sat May  3 11:02:44 2014
@@ -236,8 +236,13 @@ public class BlockManagerTestUtil {
 
   public static DatanodeDescriptor getDatanodeDescriptor(String ipAddr,
       String rackLocation, DatanodeStorage storage) {
+    return getDatanodeDescriptor(ipAddr, rackLocation, storage, "host");
+  }
+
+  public static DatanodeDescriptor getDatanodeDescriptor(String ipAddr,
+      String rackLocation, DatanodeStorage storage, String hostname) {
       DatanodeDescriptor dn = DFSTestUtil.getDatanodeDescriptor(ipAddr,
-          DFSConfigKeys.DFS_DATANODE_DEFAULT_PORT, rackLocation);
+          DFSConfigKeys.DFS_DATANODE_DEFAULT_PORT, rackLocation, hostname);
       if (storage != null) {
         dn.updateStorage(storage);
       }

Modified: 
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestReplicationPolicyWithNodeGroup.java
URL: 
http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestReplicationPolicyWithNodeGroup.java?rev=1592179&r1=1592178&r2=1592179&view=diff
==============================================================================
--- 
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestReplicationPolicyWithNodeGroup.java
 (original)
+++ 
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestReplicationPolicyWithNodeGroup.java
 Sat May  3 11:02:44 2014
@@ -47,11 +47,13 @@ import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
 
+
 public class TestReplicationPolicyWithNodeGroup {
   private static final int BLOCK_SIZE = 1024;
   private static final int NUM_OF_DATANODES = 8;
   private static final int NUM_OF_DATANODES_BOUNDARY = 6;
   private static final int NUM_OF_DATANODES_MORE_TARGETS = 12;
+  private static final int NUM_OF_DATANODES_FOR_DEPENDENCIES = 6;
   private final Configuration CONF = new HdfsConfiguration();
   private NetworkTopology cluster;
   private NameNode namenode;
@@ -113,7 +115,33 @@ public class TestReplicationPolicyWithNo
 
   private final static DatanodeDescriptor NODE = 
       new DatanodeDescriptor(DFSTestUtil.getDatanodeDescriptor("9.9.9.9", 
"/d2/r4/n7"));
-
+  
+  private static final DatanodeStorageInfo[] storagesForDependencies;
+  private static final DatanodeDescriptor[]  dataNodesForDependencies;
+  static {
+    final String[] racksForDependencies = {
+        "/d1/r1/n1",
+        "/d1/r1/n1",
+        "/d1/r1/n2",
+        "/d1/r1/n2",
+        "/d1/r1/n3",
+        "/d1/r1/n4"
+    };
+    final String[] hostNamesForDependencies = {
+        "h1",
+        "h2",
+        "h3",
+        "h4",
+        "h5",
+        "h6"
+    };
+    
+    storagesForDependencies = DFSTestUtil.createDatanodeStorageInfos(
+        racksForDependencies, hostNamesForDependencies);
+    dataNodesForDependencies = 
DFSTestUtil.toDatanodeDescriptor(storagesForDependencies);
+    
+  };
+  
   @Before
   public void setUp() throws Exception {
     FileSystem.setDefaultUri(CONF, "hdfs://localhost:0");
@@ -720,5 +748,63 @@ public class TestReplicationPolicyWithNo
     assertEquals(targets.length, 6);
   }
 
-
+  @Test
+  public void testChooseTargetWithDependencies() throws Exception {
+    for(int i=0; i<NUM_OF_DATANODES; i++) {
+      cluster.remove(dataNodes[i]);
+    }
+    
+    for(int i=0; i<NUM_OF_DATANODES_MORE_TARGETS; i++) {
+      DatanodeDescriptor node = dataNodesInMoreTargetsCase[i];
+      if (cluster.contains(node)) {
+        cluster.remove(node);
+      }
+    }
+    
+    Host2NodesMap host2DatanodeMap = namenode.getNamesystem()
+        .getBlockManager()
+        .getDatanodeManager().getHost2DatanodeMap();
+    for(int i=0; i<NUM_OF_DATANODES_FOR_DEPENDENCIES; i++) {
+      cluster.add(dataNodesForDependencies[i]);
+      host2DatanodeMap.add(dataNodesForDependencies[i]);
+    }
+    
+    //add dependencies (node1 <-> node2, and node3<->node4)
+    dataNodesForDependencies[1].addDependentHostName(
+        dataNodesForDependencies[2].getHostName());
+    dataNodesForDependencies[2].addDependentHostName(
+        dataNodesForDependencies[1].getHostName());
+    dataNodesForDependencies[3].addDependentHostName(
+        dataNodesForDependencies[4].getHostName());
+    dataNodesForDependencies[4].addDependentHostName(
+        dataNodesForDependencies[3].getHostName());
+    
+    //Update heartbeat
+    for(int i=0; i<NUM_OF_DATANODES_FOR_DEPENDENCIES; i++) {
+      updateHeartbeatWithUsage(dataNodesForDependencies[i],
+          2*HdfsConstants.MIN_BLOCKS_FOR_WRITE*BLOCK_SIZE, 0L,
+          2*HdfsConstants.MIN_BLOCKS_FOR_WRITE*BLOCK_SIZE, 0L, 0L, 0L, 0, 0);
+    }
+    
+    List<DatanodeStorageInfo> chosenNodes = new 
ArrayList<DatanodeStorageInfo>();
+    
+    DatanodeStorageInfo[] targets;
+    Set<Node> excludedNodes = new HashSet<Node>();
+    excludedNodes.add(dataNodesForDependencies[5]);
+    
+    //try to select three targets as there are three node groups
+    targets = chooseTarget(3, dataNodesForDependencies[1], chosenNodes, 
excludedNodes);
+    
+    //Even there are three node groups, verify that 
+    //only two targets are selected due to dependencies
+    assertEquals(targets.length, 2);
+    assertEquals(targets[0], storagesForDependencies[1]);
+    assertTrue(targets[1].equals(storagesForDependencies[3]) || 
targets[1].equals(storagesForDependencies[4]));
+    
+    //verify that all data nodes are in the excluded list
+    assertEquals(excludedNodes.size(), NUM_OF_DATANODES_FOR_DEPENDENCIES);
+    for(int i=0; i<NUM_OF_DATANODES_FOR_DEPENDENCIES; i++) {
+      assertTrue(excludedNodes.contains(dataNodesForDependencies[i]));
+    }
+  }
 }

Modified: 
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestFsck.java
URL: 
http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestFsck.java?rev=1592179&r1=1592178&r2=1592179&view=diff
==============================================================================
--- 
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestFsck.java
 (original)
+++ 
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestFsck.java
 Sat May  3 11:02:44 2014
@@ -68,6 +68,8 @@ import org.apache.hadoop.hdfs.protocol.E
 import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
 import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
 import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfo;
+import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager;
+import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeManager;
 import org.apache.hadoop.hdfs.server.namenode.NamenodeFsck.Result;
 import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocols;
 import org.apache.hadoop.hdfs.tools.DFSck;
@@ -981,10 +983,15 @@ public class TestFsck {
     PrintWriter out = new PrintWriter(result, true);
     InetAddress remoteAddress = InetAddress.getLocalHost();
     FSNamesystem fsName = mock(FSNamesystem.class);
+    BlockManager blockManager = mock(BlockManager.class);
+    DatanodeManager dnManager = mock(DatanodeManager.class);
+    
     when(namenode.getNamesystem()).thenReturn(fsName);
     when(fsName.getBlockLocations(anyString(), anyLong(), anyLong(),
         anyBoolean(), anyBoolean(), anyBoolean())).
         thenThrow(new FileNotFoundException()) ;
+    when(fsName.getBlockManager()).thenReturn(blockManager);
+    when(blockManager.getDatanodeManager()).thenReturn(dnManager);
 
     NamenodeFsck fsck = new NamenodeFsck(conf, namenode, nettop, pmap, out,
         NUM_REPLICAS, (short)1, remoteAddress);


Reply via email to