Modified: 
hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeDescriptor.java
URL: 
http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeDescriptor.java?rev=1619012&r1=1619011&r2=1619012&view=diff
==============================================================================
--- 
hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeDescriptor.java
 (original)
+++ 
hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeDescriptor.java
 Tue Aug 19 23:49:39 2014
@@ -234,18 +234,6 @@ public class DatanodeDescriptor extends 
     updateHeartbeat(StorageReport.EMPTY_ARRAY, 0L, 0L, 0, 0);
   }
 
-  /**
-   * Add data-node to the block. Add block to the head of the list of blocks
-   * belonging to the data-node.
-   */
-  public boolean addBlock(String storageID, BlockInfo b) {
-    DatanodeStorageInfo s = getStorageInfo(storageID);
-    if (s != null) {
-      return s.addBlock(b);
-    }
-    return false;
-  }
-
   @VisibleForTesting
   public DatanodeStorageInfo getStorageInfo(String storageID) {
     synchronized (storageMap) {
@@ -259,6 +247,15 @@ public class DatanodeDescriptor extends 
     }
   }
 
+  public StorageReport[] getStorageReports() {
+    final DatanodeStorageInfo[] infos = getStorageInfos();
+    final StorageReport[] reports = new StorageReport[infos.length];
+    for(int i = 0; i < infos.length; i++) {
+      reports[i] = infos[i].toStorageReport();
+    }
+    return reports;
+  }
+
   boolean hasStaleStorages() {
     synchronized (storageMap) {
       for (DatanodeStorageInfo storage : storageMap.values()) {
@@ -275,13 +272,10 @@ public class DatanodeDescriptor extends 
    * data-node from the block.
    */
   boolean removeBlock(BlockInfo b) {
-    int index = b.findStorageInfo(this);
+    final DatanodeStorageInfo s = b.findStorageInfo(this);
     // if block exists on this datanode
-    if (index >= 0) {
-      DatanodeStorageInfo s = b.getStorageInfo(index);
-      if (s != null) {
-        return s.removeBlock(b);
-      }
+    if (s != null) {
+      return s.removeBlock(b);
     }
     return false;
   }
@@ -298,24 +292,6 @@ public class DatanodeDescriptor extends 
     return false;
   }
 
-  /**
-   * Replace specified old block with a new one in the DataNodeDescriptor.
-   *
-   * @param oldBlock - block to be replaced
-   * @param newBlock - a replacement block
-   * @return the new block
-   */
-  public BlockInfo replaceBlock(BlockInfo oldBlock, BlockInfo newBlock) {
-    int index = oldBlock.findStorageInfo(this);
-    DatanodeStorageInfo s = oldBlock.getStorageInfo(index);
-    boolean done = s.removeBlock(oldBlock);
-    assert done : "Old block should belong to the data-node when replacing";
-
-    done = s.addBlock(newBlock);
-    assert done : "New block should not belong to the data-node when 
replacing";
-    return newBlock;
-  }
-
   public void resetBlocks() {
     setCapacity(0);
     setRemaining(0);

Modified: 
hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java
URL: 
http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java?rev=1619012&r1=1619011&r2=1619012&view=diff
==============================================================================
--- 
hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java
 (original)
+++ 
hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java
 Tue Aug 19 23:49:39 2014
@@ -135,7 +135,10 @@ public class DatanodeManager {
   
   /** The number of stale DataNodes */
   private volatile int numStaleNodes;
-  
+
+  /** The number of stale storages */
+  private volatile int numStaleStorages;
+
   /**
    * Whether or not this cluster has ever consisted of more than 1 rack,
    * according to the NetworkTopology.
@@ -331,9 +334,22 @@ public class DatanodeManager {
     return heartbeatManager;
   }
 
+  private boolean isInactive(DatanodeInfo datanode) {
+    if (datanode.isDecommissioned()) {
+      return true;
+    }
+
+    if (avoidStaleDataNodesForRead) {
+      return datanode.isStale(staleInterval);
+    }
+      
+    return false;
+  }
+  
   /** Sort the located blocks by the distance to the target host. */
   public void sortLocatedBlocks(final String targethost,
-      final List<LocatedBlock> locatedblocks) {
+      final List<LocatedBlock> locatedblocks,
+      boolean randomizeBlockLocationsPerBlock) {
     //sort the blocks
     // As it is possible for the separation of node manager and datanode, 
     // here we should get node but not datanode only .
@@ -351,9 +367,17 @@ public class DatanodeManager {
         DFSUtil.DECOM_COMPARATOR;
         
     for (LocatedBlock b : locatedblocks) {
-      networktopology.pseudoSortByDistance(client, b.getLocations());
+      DatanodeInfo[] di = b.getLocations();
       // Move decommissioned/stale datanodes to the bottom
-      Arrays.sort(b.getLocations(), comparator);
+      Arrays.sort(di, comparator);
+      
+      int lastActiveIndex = di.length - 1;
+      while (lastActiveIndex > 0 && isInactive(di[lastActiveIndex])) {
+          --lastActiveIndex;
+      }
+      int activeLen = lastActiveIndex + 1;      
+      networktopology.sortByDistance(client, b.getLocations(), activeLen, b
+          .getBlock().getBlockId(), randomizeBlockLocationsPerBlock);
     }
   }
   
@@ -373,6 +397,11 @@ public class DatanodeManager {
     return host2DatanodeMap.getDatanodeByXferAddr(host, xferPort);
   }
 
+  /** @return the Host2NodesMap */
+  public Host2NodesMap getHost2DatanodeMap() {
+    return this.host2DatanodeMap;
+  }
+
   /**
    * Given datanode address or host name, returns the DatanodeDescriptor for 
the
    * same, or if it doesn't find the datanode, it looks for a machine local and
@@ -435,9 +464,9 @@ public class DatanodeManager {
   }
 
   /**
-   * Get data node by storage ID.
+   * Get data node by datanode ID.
    * 
-   * @param nodeID
+   * @param nodeID datanode ID
    * @return DatanodeDescriptor or null if the node is not found.
    * @throws UnregisteredNodeException
    */
@@ -678,6 +707,52 @@ public class DatanodeManager {
   }
 
   /**
+   * Resolve a node's dependencies in the network. If the DNS to switch 
+   * mapping fails then this method returns empty list of dependencies 
+   * @param node to get dependencies for
+   * @return List of dependent host names
+   */
+  private List<String> getNetworkDependenciesWithDefault(DatanodeInfo node) {
+    List<String> dependencies;
+    try {
+      dependencies = getNetworkDependencies(node);
+    } catch (UnresolvedTopologyException e) {
+      LOG.error("Unresolved dependency mapping for host " + 
+          node.getHostName() +". Continuing with an empty dependency list");
+      dependencies = Collections.emptyList();
+    }
+    return dependencies;
+  }
+  
+  /**
+   * Resolves a node's dependencies in the network. If the DNS to switch 
+   * mapping fails to get dependencies, then this method throws 
+   * UnresolvedTopologyException. 
+   * @param node to get dependencies for
+   * @return List of dependent host names 
+   * @throws UnresolvedTopologyException if the DNS to switch mapping fails
+   */
+  private List<String> getNetworkDependencies(DatanodeInfo node)
+      throws UnresolvedTopologyException {
+    List<String> dependencies = Collections.emptyList();
+
+    if (dnsToSwitchMapping instanceof DNSToSwitchMappingWithDependency) {
+      //Get dependencies
+      dependencies = 
+          ((DNSToSwitchMappingWithDependency)dnsToSwitchMapping).getDependency(
+              node.getHostName());
+      if(dependencies == null) {
+        LOG.error("The dependency call returned null for host " + 
+            node.getHostName());
+        throw new UnresolvedTopologyException("The dependency call returned " 
+ 
+            "null for host " + node.getHostName());
+      }
+    }
+
+    return dependencies;
+  }
+
+  /**
    * Remove an already decommissioned data node who is neither in include nor
    * exclude hosts lists from the the list of live or dead nodes.  This is used
    * to not display an already decommssioned data node to the operators.
@@ -749,7 +824,9 @@ public class DatanodeManager {
   }
 
   /** Start decommissioning the specified datanode. */
-  private void startDecommission(DatanodeDescriptor node) {
+  @InterfaceAudience.Private
+  @VisibleForTesting
+  public void startDecommission(DatanodeDescriptor node) {
     if (!node.isDecommissionInProgress() && !node.isDecommissioned()) {
       for (DatanodeStorageInfo storage : node.getStorageInfos()) {
         LOG.info("Start Decommissioning " + node + " " + storage
@@ -869,12 +946,14 @@ public class DatanodeManager {
           nodeS.setDisallowed(false); // Node is in the include list
 
           // resolve network location
-          if(this.rejectUnresolvedTopologyDN)
-          {
-            nodeS.setNetworkLocation(resolveNetworkLocation(nodeS));  
+          if(this.rejectUnresolvedTopologyDN) {
+            nodeS.setNetworkLocation(resolveNetworkLocation(nodeS));
+            nodeS.setDependentHostNames(getNetworkDependencies(nodeS));
           } else {
             nodeS.setNetworkLocation(
                 resolveNetworkLocationWithFallBackToDefaultLocation(nodeS));
+            nodeS.setDependentHostNames(
+                getNetworkDependenciesWithDefault(nodeS));
           }
           getNetworkTopology().add(nodeS);
             
@@ -900,9 +979,12 @@ public class DatanodeManager {
         // resolve network location
         if(this.rejectUnresolvedTopologyDN) {
           nodeDescr.setNetworkLocation(resolveNetworkLocation(nodeDescr));
+          nodeDescr.setDependentHostNames(getNetworkDependencies(nodeDescr));
         } else {
           nodeDescr.setNetworkLocation(
               resolveNetworkLocationWithFallBackToDefaultLocation(nodeDescr));
+          nodeDescr.setDependentHostNames(
+              getNetworkDependenciesWithDefault(nodeDescr));
         }
         networktopology.add(nodeDescr);
         nodeDescr.setSoftwareVersion(nodeReg.getSoftwareVersion());
@@ -1001,34 +1083,23 @@ public class DatanodeManager {
 
   /** @return the number of dead datanodes. */
   public int getNumDeadDataNodes() {
-    int numDead = 0;
-    synchronized (datanodeMap) {   
-      for(DatanodeDescriptor dn : datanodeMap.values()) {
-        if (isDatanodeDead(dn) ) {
-          numDead++;
-        }
-      }
-    }
-    return numDead;
+    return getDatanodeListForReport(DatanodeReportType.DEAD).size();
   }
 
   /** @return list of datanodes where decommissioning is in progress. */
   public List<DatanodeDescriptor> getDecommissioningNodes() {
-    namesystem.readLock();
-    try {
-      final List<DatanodeDescriptor> decommissioningNodes
-          = new ArrayList<DatanodeDescriptor>();
-      final List<DatanodeDescriptor> results = getDatanodeListForReport(
-          DatanodeReportType.LIVE);
-      for(DatanodeDescriptor node : results) {
-        if (node.isDecommissionInProgress()) {
-          decommissioningNodes.add(node);
-        }
+    // There is no need to take namesystem reader lock as
+    // getDatanodeListForReport will synchronize on datanodeMap
+    final List<DatanodeDescriptor> decommissioningNodes
+        = new ArrayList<DatanodeDescriptor>();
+    final List<DatanodeDescriptor> results = getDatanodeListForReport(
+        DatanodeReportType.LIVE);
+    for(DatanodeDescriptor node : results) {
+      if (node.isDecommissionInProgress()) {
+        decommissioningNodes.add(node);
       }
-      return decommissioningNodes;
-    } finally {
-      namesystem.readUnlock();
     }
+    return decommissioningNodes;
   }
   
   /* Getter and Setter for stale DataNodes related attributes */
@@ -1074,6 +1145,22 @@ public class DatanodeManager {
     return this.numStaleNodes;
   }
 
+  /**
+   * Get the number of content stale storages.
+   */
+  public int getNumStaleStorages() {
+    return numStaleStorages;
+  }
+
+  /**
+   * Set the number of content stale storages.
+   *
+   * @param numStaleStorages The number of content stale storages.
+   */
+  void setNumStaleStorages(int numStaleStorages) {
+    this.numStaleStorages = numStaleStorages;
+  }
+
   /** Fetch live and dead datanodes. */
   public void fetchDatanodes(final List<DatanodeDescriptor> live, 
       final List<DatanodeDescriptor> dead, final boolean 
removeDecommissionNode) {
@@ -1081,23 +1168,20 @@ public class DatanodeManager {
       throw new HadoopIllegalArgumentException("Both live and dead lists are 
null");
     }
 
-    namesystem.readLock();
-    try {
-      final List<DatanodeDescriptor> results =
-          getDatanodeListForReport(DatanodeReportType.ALL);    
-      for(DatanodeDescriptor node : results) {
-        if (isDatanodeDead(node)) {
-          if (dead != null) {
-            dead.add(node);
-          }
-        } else {
-          if (live != null) {
-            live.add(node);
-          }
+    // There is no need to take namesystem reader lock as
+    // getDatanodeListForReport will synchronize on datanodeMap
+    final List<DatanodeDescriptor> results =
+        getDatanodeListForReport(DatanodeReportType.ALL);
+    for(DatanodeDescriptor node : results) {
+      if (isDatanodeDead(node)) {
+        if (dead != null) {
+          dead.add(node);
+        }
+      } else {
+        if (live != null) {
+          live.add(node);
         }
       }
-    } finally {
-      namesystem.readUnlock();
     }
     
     if (removeDecommissionNode) {
@@ -1189,10 +1273,15 @@ public class DatanodeManager {
   /** For generating datanode reports */
   public List<DatanodeDescriptor> getDatanodeListForReport(
       final DatanodeReportType type) {
-    boolean listLiveNodes = type == DatanodeReportType.ALL ||
-                            type == DatanodeReportType.LIVE;
-    boolean listDeadNodes = type == DatanodeReportType.ALL ||
-                            type == DatanodeReportType.DEAD;
+    final boolean listLiveNodes =
+        type == DatanodeReportType.ALL ||
+        type == DatanodeReportType.LIVE;
+    final boolean listDeadNodes =
+        type == DatanodeReportType.ALL ||
+        type == DatanodeReportType.DEAD;
+    final boolean listDecommissioningNodes =
+        type == DatanodeReportType.ALL ||
+        type == DatanodeReportType.DECOMMISSIONING;
 
     ArrayList<DatanodeDescriptor> nodes;
     final HostFileManager.HostSet foundNodes = new HostFileManager.HostSet();
@@ -1203,7 +1292,10 @@ public class DatanodeManager {
       nodes = new ArrayList<DatanodeDescriptor>(datanodeMap.size());
       for (DatanodeDescriptor dn : datanodeMap.values()) {
         final boolean isDead = isDatanodeDead(dn);
-        if ((listLiveNodes && !isDead) || (listDeadNodes && isDead)) {
+        final boolean isDecommissioning = dn.isDecommissionInProgress();
+        if ((listLiveNodes && !isDead) ||
+            (listDeadNodes && isDead) ||
+            (listDecommissioningNodes && isDecommissioning)) {
             nodes.add(dn);
         }
         foundNodes.add(HostFileManager.resolvedAddressFromDatanodeID(dn));

Modified: 
hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeStatistics.java
URL: 
http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeStatistics.java?rev=1619012&r1=1619011&r2=1619012&view=diff
==============================================================================
--- 
hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeStatistics.java
 (original)
+++ 
hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeStatistics.java
 Tue Aug 19 23:49:39 2014
@@ -52,6 +52,12 @@ public interface DatanodeStatistics {
   /** @return the xceiver count */
   public int getXceiverCount();
 
+  /** @return average xceiver count for non-decommission(ing|ed) nodes */
+  public int getInServiceXceiverCount();
+  
+  /** @return number of non-decommission(ing|ed) nodes */
+  public int getNumDatanodesInService();
+  
   /**
    * @return the total used space by data nodes for non-DFS purposes
    * such as storing temporary files on the local file system

Modified: 
hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeStorageInfo.java
URL: 
http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeStorageInfo.java?rev=1619012&r1=1619011&r2=1619012&view=diff
==============================================================================
--- 
hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeStorageInfo.java
 (original)
+++ 
hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeStorageInfo.java
 Tue Aug 19 23:49:39 2014
@@ -22,6 +22,7 @@ import java.util.Iterator;
 import java.util.List;
 
 import com.google.common.annotations.VisibleForTesting;
+
 import org.apache.hadoop.hdfs.StorageType;
 import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
 import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage;
@@ -206,13 +207,29 @@ public class DatanodeStorageInfo {
     return blockPoolUsed;
   }
 
-  boolean addBlock(BlockInfo b) {
-    if(!b.addStorage(this))
-      return false;
+  public boolean addBlock(BlockInfo b) {
+    // First check whether the block belongs to a different storage
+    // on the same DN.
+    boolean replaced = false;
+    DatanodeStorageInfo otherStorage =
+        b.findStorageInfo(getDatanodeDescriptor());
+
+    if (otherStorage != null) {
+      if (otherStorage != this) {
+        // The block belongs to a different storage. Remove it first.
+        otherStorage.removeBlock(b);
+        replaced = true;
+      } else {
+        // The block is already associated with this storage.
+        return false;
+      }
+    }
+
     // add to the head of the data-node list
+    b.addStorage(this);
     blockList = b.listInsert(blockList, this);
     numBlocks++;
-    return true;
+    return !replaced;
   }
 
   boolean removeBlock(BlockInfo b) {
@@ -290,4 +307,27 @@ public class DatanodeStorageInfo {
   public String toString() {
     return "[" + storageType + "]" + storageID + ":" + state;
   }
+  
+  StorageReport toStorageReport() {
+    return new StorageReport(
+        new DatanodeStorage(storageID, state, storageType),
+        false, capacity, dfsUsed, remaining, blockPoolUsed);
+  }
+
+  /** @return the first {@link DatanodeStorageInfo} corresponding to
+   *          the given datanode
+   */
+  static DatanodeStorageInfo getDatanodeStorageInfo(
+      final Iterable<DatanodeStorageInfo> infos,
+      final DatanodeDescriptor datanode) {
+    if (datanode == null) {
+      return null;
+    }
+    for(DatanodeStorageInfo storage : infos) {
+      if (storage.getDatanodeDescriptor() == datanode) {
+        return storage;
+      }
+    }
+    return null;
+  }
 }

Modified: 
hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/HeartbeatManager.java
URL: 
http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/HeartbeatManager.java?rev=1619012&r1=1619011&r2=1619012&view=diff
==============================================================================
--- 
hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/HeartbeatManager.java
 (original)
+++ 
hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/HeartbeatManager.java
 Tue Aug 19 23:49:39 2014
@@ -151,6 +151,16 @@ class HeartbeatManager implements Datano
   }
   
   @Override
+  public synchronized int getInServiceXceiverCount() {
+    return stats.nodesInServiceXceiverCount;
+  }
+  
+  @Override
+  public synchronized int getNumDatanodesInService() {
+    return stats.nodesInService;
+  }
+  
+  @Override
   public synchronized long getCacheCapacity() {
     return stats.cacheCapacity;
   }
@@ -178,7 +188,7 @@ class HeartbeatManager implements Datano
   }
 
   synchronized void register(final DatanodeDescriptor d) {
-    if (!datanodes.contains(d)) {
+    if (!d.isAlive) {
       addDatanode(d);
 
       //update its timestamp
@@ -191,6 +201,8 @@ class HeartbeatManager implements Datano
   }
 
   synchronized void addDatanode(final DatanodeDescriptor d) {
+    // update in-service node count
+    stats.add(d);
     datanodes.add(d);
     d.isAlive = true;
   }
@@ -244,6 +256,7 @@ class HeartbeatManager implements Datano
       DatanodeID dead = null;
       // check the number of stale nodes
       int numOfStaleNodes = 0;
+      int numOfStaleStorages = 0;
       synchronized(this) {
         for (DatanodeDescriptor d : datanodes) {
           if (dead == null && dm.isDatanodeDead(d)) {
@@ -253,10 +266,17 @@ class HeartbeatManager implements Datano
           if (d.isStale(dm.getStaleInterval())) {
             numOfStaleNodes++;
           }
+          DatanodeStorageInfo[] storageInfos = d.getStorageInfos();
+          for(DatanodeStorageInfo storageInfo : storageInfos) {
+            if (storageInfo.areBlockContentsStale()) {
+              numOfStaleStorages++;
+            }
+          }
         }
         
         // Set the number of stale nodes in the DatanodeManager
         dm.setNumStaleNodes(numOfStaleNodes);
+        dm.setNumStaleStorages(numOfStaleStorages);
       }
 
       allAlive = dead == null;
@@ -323,6 +343,9 @@ class HeartbeatManager implements Datano
     private long cacheCapacity = 0L;
     private long cacheUsed = 0L;
 
+    private int nodesInService = 0;
+    private int nodesInServiceXceiverCount = 0;
+
     private int expiredHeartbeats = 0;
 
     private void add(final DatanodeDescriptor node) {
@@ -330,6 +353,8 @@ class HeartbeatManager implements Datano
       blockPoolUsed += node.getBlockPoolUsed();
       xceiverCount += node.getXceiverCount();
       if (!(node.isDecommissionInProgress() || node.isDecommissioned())) {
+        nodesInService++;
+        nodesInServiceXceiverCount += node.getXceiverCount();
         capacityTotal += node.getCapacity();
         capacityRemaining += node.getRemaining();
       } else {
@@ -344,6 +369,8 @@ class HeartbeatManager implements Datano
       blockPoolUsed -= node.getBlockPoolUsed();
       xceiverCount -= node.getXceiverCount();
       if (!(node.isDecommissionInProgress() || node.isDecommissioned())) {
+        nodesInService--;
+        nodesInServiceXceiverCount -= node.getXceiverCount();
         capacityTotal -= node.getCapacity();
         capacityRemaining -= node.getRemaining();
       } else {

Modified: 
hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/Host2NodesMap.java
URL: 
http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/Host2NodesMap.java?rev=1619012&r1=1619011&r2=1619012&view=diff
==============================================================================
--- 
hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/Host2NodesMap.java
 (original)
+++ 
hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/Host2NodesMap.java
 Tue Aug 19 23:49:39 2014
@@ -31,6 +31,7 @@ import org.apache.hadoop.hdfs.DFSUtil;
 @InterfaceAudience.Private
 @InterfaceStability.Evolving
 class Host2NodesMap {
+  private HashMap<String, String> mapHost = new HashMap<String, String>();
   private final HashMap<String, DatanodeDescriptor[]> map
     = new HashMap<String, DatanodeDescriptor[]>();
   private final ReadWriteLock hostmapLock = new ReentrantReadWriteLock();
@@ -69,6 +70,10 @@ class Host2NodesMap {
       }
       
       String ipAddr = node.getIpAddr();
+      String hostname = node.getHostName();
+      
+      mapHost.put(hostname, ipAddr);
+      
       DatanodeDescriptor[] nodes = map.get(ipAddr);
       DatanodeDescriptor[] newNodes;
       if (nodes==null) {
@@ -95,6 +100,7 @@ class Host2NodesMap {
     }
       
     String ipAddr = node.getIpAddr();
+    String hostname = node.getHostName();
     hostmapLock.writeLock().lock();
     try {
 
@@ -105,6 +111,8 @@ class Host2NodesMap {
       if (nodes.length==1) {
         if (nodes[0]==node) {
           map.remove(ipAddr);
+          //remove hostname key since last datanode is removed
+          mapHost.remove(hostname);
           return true;
         } else {
           return false;
@@ -188,12 +196,40 @@ class Host2NodesMap {
     }
   }
 
+  
+
+  /** get a data node by its hostname. This should be used if only one 
+   * datanode service is running on a hostname. If multiple datanodes
+   * are running on a hostname then use methods getDataNodeByXferAddr and
+   * getDataNodeByHostNameAndPort.
+   * @return DatanodeDescriptor if found; otherwise null.
+   */
+  DatanodeDescriptor getDataNodeByHostName(String hostname) {
+    if(hostname == null) {
+      return null;
+    }
+    
+    hostmapLock.readLock().lock();
+    try {
+      String ipAddr = mapHost.get(hostname);
+      if(ipAddr == null) {
+        return null;
+      } else {  
+        return getDatanodeByHost(ipAddr);
+      }
+    } finally {
+      hostmapLock.readLock().unlock();
+    }
+  }
+
   @Override
   public String toString() {
     final StringBuilder b = new StringBuilder(getClass().getSimpleName())
         .append("[");
-    for(Map.Entry<String, DatanodeDescriptor[]> e : map.entrySet()) {
-      b.append("\n  " + e.getKey() + " => " + Arrays.asList(e.getValue()));
+    for(Map.Entry<String, String> host: mapHost.entrySet()) {
+      DatanodeDescriptor[] e = map.get(host.getValue());
+      b.append("\n  " + host.getKey() + " => "+host.getValue() + " => " 
+          + Arrays.asList(e));
     }
     return b.append("\n]").toString();
   }

Modified: 
hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/InvalidateBlocks.java
URL: 
http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/InvalidateBlocks.java?rev=1619012&r1=1619011&r2=1619012&view=diff
==============================================================================
--- 
hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/InvalidateBlocks.java
 (original)
+++ 
hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/InvalidateBlocks.java
 Tue Aug 19 23:49:39 2014
@@ -18,16 +18,25 @@
 package org.apache.hadoop.hdfs.server.blockmanagement;
 
 import java.io.PrintWriter;
+import java.text.SimpleDateFormat;
 import java.util.ArrayList;
+import java.util.Calendar;
+import java.util.GregorianCalendar;
 import java.util.List;
 import java.util.Map;
 import java.util.TreeMap;
 
+import org.apache.commons.logging.Log;
 import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.hdfs.DFSConfigKeys;
 import org.apache.hadoop.hdfs.protocol.Block;
 import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
 import org.apache.hadoop.hdfs.server.namenode.NameNode;
 import org.apache.hadoop.hdfs.util.LightWeightHashSet;
+import org.apache.hadoop.util.Time;
+import org.apache.hadoop.hdfs.DFSUtil;
+
+import com.google.common.annotations.VisibleForTesting;
 
 /**
  * Keeps a Collection for every named machine containing blocks
@@ -36,16 +45,36 @@ import org.apache.hadoop.hdfs.util.Light
  */
 @InterfaceAudience.Private
 class InvalidateBlocks {
-  /** Mapping: StorageID -> Collection of Blocks */
-  private final Map<String, LightWeightHashSet<Block>> node2blocks =
-      new TreeMap<String, LightWeightHashSet<Block>>();
+  /** Mapping: DatanodeInfo -> Collection of Blocks */
+  private final Map<DatanodeInfo, LightWeightHashSet<Block>> node2blocks =
+      new TreeMap<DatanodeInfo, LightWeightHashSet<Block>>();
   /** The total number of blocks in the map. */
   private long numBlocks = 0L;
 
-  private final DatanodeManager datanodeManager;
+  private final int blockInvalidateLimit;
 
-  InvalidateBlocks(final DatanodeManager datanodeManager) {
-    this.datanodeManager = datanodeManager;
+  /**
+   * The period of pending time for block invalidation since the NameNode
+   * startup
+   */
+  private final long pendingPeriodInMs;
+  /** the startup time */
+  private final long startupTime = Time.monotonicNow();
+
+  InvalidateBlocks(final int blockInvalidateLimit, long pendingPeriodInMs) {
+    this.blockInvalidateLimit = blockInvalidateLimit;
+    this.pendingPeriodInMs = pendingPeriodInMs;
+    printBlockDeletionTime(BlockManager.LOG);
+  }
+
+  private void printBlockDeletionTime(final Log log) {
+    log.info(DFSConfigKeys.DFS_NAMENODE_STARTUP_DELAY_BLOCK_DELETION_SEC_KEY
+        + " is set to " + DFSUtil.durationToString(pendingPeriodInMs));
+    SimpleDateFormat sdf = new SimpleDateFormat("yyyy MMM dd HH:mm:ss");
+    Calendar calendar = new GregorianCalendar();
+    calendar.add(Calendar.SECOND, (int) (this.pendingPeriodInMs / 1000));
+    log.info("The block deletion will start around "
+        + sdf.format(calendar.getTime()));
   }
 
   /** @return the number of blocks to be invalidated . */
@@ -58,12 +87,9 @@ class InvalidateBlocks {
    * invalidation. Blocks are compared including their generation stamps:
    * if a block is pending invalidation but with a different generation stamp,
    * returns false.
-   * @param storageID the storage to check
-   * @param the block to look for
-   * 
    */
-  synchronized boolean contains(final String storageID, final Block block) {
-    final LightWeightHashSet<Block> s = node2blocks.get(storageID);
+  synchronized boolean contains(final DatanodeInfo dn, final Block block) {
+    final LightWeightHashSet<Block> s = node2blocks.get(dn);
     if (s == null) {
       return false; // no invalidate blocks for this storage ID
     }
@@ -78,10 +104,10 @@ class InvalidateBlocks {
    */
   synchronized void add(final Block block, final DatanodeInfo datanode,
       final boolean log) {
-    LightWeightHashSet<Block> set = 
node2blocks.get(datanode.getDatanodeUuid());
+    LightWeightHashSet<Block> set = node2blocks.get(datanode);
     if (set == null) {
       set = new LightWeightHashSet<Block>();
-      node2blocks.put(datanode.getDatanodeUuid(), set);
+      node2blocks.put(datanode, set);
     }
     if (set.add(block)) {
       numBlocks++;
@@ -93,20 +119,20 @@ class InvalidateBlocks {
   }
 
   /** Remove a storage from the invalidatesSet */
-  synchronized void remove(final String storageID) {
-    final LightWeightHashSet<Block> blocks = node2blocks.remove(storageID);
+  synchronized void remove(final DatanodeInfo dn) {
+    final LightWeightHashSet<Block> blocks = node2blocks.remove(dn);
     if (blocks != null) {
       numBlocks -= blocks.size();
     }
   }
 
   /** Remove the block from the specified storage. */
-  synchronized void remove(final String storageID, final Block block) {
-    final LightWeightHashSet<Block> v = node2blocks.get(storageID);
+  synchronized void remove(final DatanodeInfo dn, final Block block) {
+    final LightWeightHashSet<Block> v = node2blocks.get(dn);
     if (v != null && v.remove(block)) {
       numBlocks--;
       if (v.isEmpty()) {
-        node2blocks.remove(storageID);
+        node2blocks.remove(dn);
       }
     }
   }
@@ -120,34 +146,50 @@ class InvalidateBlocks {
       return;
     }
 
-    for(Map.Entry<String,LightWeightHashSet<Block>> entry : 
node2blocks.entrySet()) {
+    for(Map.Entry<DatanodeInfo, LightWeightHashSet<Block>> entry : 
node2blocks.entrySet()) {
       final LightWeightHashSet<Block> blocks = entry.getValue();
       if (blocks.size() > 0) {
-        out.println(datanodeManager.getDatanode(entry.getKey()));
+        out.println(entry.getKey());
         out.println(blocks);
       }
     }
   }
 
   /** @return a list of the storage IDs. */
-  synchronized List<String> getStorageIDs() {
-    return new ArrayList<String>(node2blocks.keySet());
+  synchronized List<DatanodeInfo> getDatanodes() {
+    return new ArrayList<DatanodeInfo>(node2blocks.keySet());
+  }
+
+  /**
+   * @return the remianing pending time
+   */
+  @VisibleForTesting
+  long getInvalidationDelay() {
+    return pendingPeriodInMs - (Time.monotonicNow() - startupTime);
   }
 
-  synchronized List<Block> invalidateWork(
-      final String storageId, final DatanodeDescriptor dn) {
-    final LightWeightHashSet<Block> set = node2blocks.get(storageId);
+  synchronized List<Block> invalidateWork(final DatanodeDescriptor dn) {
+    final long delay = getInvalidationDelay();
+    if (delay > 0) {
+      if (BlockManager.LOG.isDebugEnabled()) {
+        BlockManager.LOG
+            .debug("Block deletion is delayed during NameNode startup. "
+                + "The deletion will start after " + delay + " ms.");
+      }
+      return null;
+    }
+    final LightWeightHashSet<Block> set = node2blocks.get(dn);
     if (set == null) {
       return null;
     }
 
     // # blocks that can be sent in one message is limited
-    final int limit = datanodeManager.blockInvalidateLimit;
+    final int limit = blockInvalidateLimit;
     final List<Block> toInvalidate = set.pollN(limit);
 
     // If we send everything in this message, remove this node entry
     if (set.isEmpty()) {
-      remove(storageId);
+      remove(dn);
     }
 
     dn.addBlocksToBeInvalidated(toInvalidate);

Modified: 
hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/PendingDataNodeMessages.java
URL: 
http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/PendingDataNodeMessages.java?rev=1619012&r1=1619011&r2=1619012&view=diff
==============================================================================
--- 
hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/PendingDataNodeMessages.java
 (original)
+++ 
hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/PendingDataNodeMessages.java
 Tue Aug 19 23:49:39 2014
@@ -23,6 +23,7 @@ import java.util.Queue;
 
 import org.apache.hadoop.hdfs.protocol.Block;
 import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.ReplicaState;
+
 import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
 
@@ -41,14 +42,12 @@ class PendingDataNodeMessages {
     
   static class ReportedBlockInfo {
     private final Block block;
-    private final DatanodeDescriptor dn;
-    private final String storageID;
+    private final DatanodeStorageInfo storageInfo;
     private final ReplicaState reportedState;
 
-    ReportedBlockInfo(DatanodeDescriptor dn, String storageID, Block block,
+    ReportedBlockInfo(DatanodeStorageInfo storageInfo, Block block,
         ReplicaState reportedState) {
-      this.dn = dn;
-      this.storageID = storageID;
+      this.storageInfo = storageInfo;
       this.block = block;
       this.reportedState = reportedState;
     }
@@ -57,30 +56,48 @@ class PendingDataNodeMessages {
       return block;
     }
 
-    DatanodeDescriptor getNode() {
-      return dn;
-    }
-    
-    String getStorageID() {
-      return storageID;
-    }
-
     ReplicaState getReportedState() {
       return reportedState;
     }
+    
+    DatanodeStorageInfo getStorageInfo() {
+      return storageInfo;
+    }
 
     @Override
     public String toString() {
-      return "ReportedBlockInfo [block=" + block + ", dn=" + dn
+      return "ReportedBlockInfo [block=" + block + ", dn="
+          + storageInfo.getDatanodeDescriptor()
           + ", reportedState=" + reportedState + "]";
     }
   }
   
-  void enqueueReportedBlock(DatanodeDescriptor dn, String storageID, Block 
block,
+  /**
+   * Remove all pending DN messages which reference the given DN.
+   * @param dn the datanode whose messages we should remove.
+   */
+  void removeAllMessagesForDatanode(DatanodeDescriptor dn) {
+    for (Map.Entry<Block, Queue<ReportedBlockInfo>> entry :
+        queueByBlockId.entrySet()) {
+      Queue<ReportedBlockInfo> newQueue = Lists.newLinkedList();
+      Queue<ReportedBlockInfo> oldQueue = entry.getValue();
+      while (!oldQueue.isEmpty()) {
+        ReportedBlockInfo rbi = oldQueue.remove();
+        if (!rbi.getStorageInfo().getDatanodeDescriptor().equals(dn)) {
+          newQueue.add(rbi);
+        } else {
+          count--;
+        }
+      }
+      queueByBlockId.put(entry.getKey(), newQueue);
+    }
+  }
+  
+  void enqueueReportedBlock(DatanodeStorageInfo storageInfo, Block block,
       ReplicaState reportedState) {
     block = new Block(block);
     getBlockQueue(block).add(
-        new ReportedBlockInfo(dn, storageID, block, reportedState));
+        new ReportedBlockInfo(storageInfo, block, reportedState));
     count++;
   }
   
@@ -106,7 +123,7 @@ class PendingDataNodeMessages {
     return queue;
   }
   
-  public int count() {
+  int count() {
     return count ;
   }
 
@@ -123,7 +140,7 @@ class PendingDataNodeMessages {
     return sb.toString();
   }
 
-  public Iterable<ReportedBlockInfo> takeAll() {
+  Iterable<ReportedBlockInfo> takeAll() {
     List<ReportedBlockInfo> rbis = Lists.newArrayListWithCapacity(
         count);
     for (Queue<ReportedBlockInfo> q : queueByBlockId.values()) {

Modified: 
hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/HdfsServerConstants.java
URL: 
http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/HdfsServerConstants.java?rev=1619012&r1=1619011&r2=1619012&view=diff
==============================================================================
--- 
hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/HdfsServerConstants.java
 (original)
+++ 
hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/HdfsServerConstants.java
 Tue Aug 19 23:49:39 2014
@@ -92,7 +92,9 @@ public final class HdfsServerConstants {
     RECOVER  ("-recover"),
     FORCE("-force"),
     NONINTERACTIVE("-nonInteractive"),
-    RENAMERESERVED("-renameReserved");
+    RENAMERESERVED("-renameReserved"),
+    METADATAVERSION("-metadataVersion"),
+    UPGRADEONLY("-upgradeOnly");
 
     private static final Pattern ENUM_WITH_ROLLING_UPGRADE_OPTION = 
Pattern.compile(
         "(\\w+)\\((\\w+)\\)");

Modified: 
hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/JspHelper.java
URL: 
http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/JspHelper.java?rev=1619012&r1=1619011&r2=1619012&view=diff
==============================================================================
--- 
hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/JspHelper.java
 (original)
+++ 
hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/JspHelper.java
 Tue Aug 19 23:49:39 2014
@@ -18,544 +18,45 @@
 
 package org.apache.hadoop.hdfs.server.common;
 
-import static 
org.apache.hadoop.fs.CommonConfigurationKeys.DEFAULT_HADOOP_HTTP_STATIC_USER;
-import static 
org.apache.hadoop.fs.CommonConfigurationKeys.HADOOP_HTTP_STATIC_USER;
-
-import java.io.ByteArrayInputStream;
-import java.io.DataInputStream;
-import java.io.IOException;
-import java.io.UnsupportedEncodingException;
-import java.net.InetAddress;
-import java.net.InetSocketAddress;
-import java.net.Socket;
-import java.net.URL;
-import java.net.URLEncoder;
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.Comparator;
-import java.util.HashMap;
-import java.util.List;
-
-import javax.servlet.ServletContext;
-import javax.servlet.http.HttpServletRequest;
-import javax.servlet.jsp.JspWriter;
-
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hdfs.BlockReader;
-import org.apache.hadoop.hdfs.BlockReaderFactory;
-import org.apache.hadoop.hdfs.ClientContext;
-import org.apache.hadoop.hdfs.DFSClient;
-import org.apache.hadoop.hdfs.DFSUtil;
-import org.apache.hadoop.hdfs.RemotePeerFactory;
-import org.apache.hadoop.hdfs.net.Peer;
-import org.apache.hadoop.hdfs.net.TcpPeerServer;
-import org.apache.hadoop.hdfs.protocol.DatanodeID;
-import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
-import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
-import org.apache.hadoop.hdfs.protocol.LocatedBlock;
-import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
-import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
-import org.apache.hadoop.hdfs.security.token.block.DataEncryptionKey;
 import 
org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier;
-import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor;
-import org.apache.hadoop.hdfs.server.datanode.CachingStrategy;
 import org.apache.hadoop.hdfs.server.namenode.NameNode;
 import org.apache.hadoop.hdfs.server.namenode.NameNodeHttpServer;
 import org.apache.hadoop.hdfs.web.resources.DelegationParam;
 import org.apache.hadoop.hdfs.web.resources.DoAsParam;
 import org.apache.hadoop.hdfs.web.resources.UserParam;
-import org.apache.hadoop.http.HtmlQuoting;
-import org.apache.hadoop.io.IOUtils;
 import org.apache.hadoop.net.NetUtils;
 import org.apache.hadoop.security.AccessControlException;
 import org.apache.hadoop.security.SecurityUtil;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.security.UserGroupInformation.AuthenticationMethod;
 import org.apache.hadoop.security.authentication.util.KerberosName;
+import org.apache.hadoop.security.authorize.ProxyServers;
 import org.apache.hadoop.security.authorize.ProxyUsers;
 import org.apache.hadoop.security.token.Token;
-import org.apache.hadoop.util.VersionInfo;
 
-import com.google.common.base.Charsets;
+import javax.servlet.ServletContext;
+import javax.servlet.http.HttpServletRequest;
+import java.io.ByteArrayInputStream;
+import java.io.DataInputStream;
+import java.io.IOException;
+import java.net.InetSocketAddress;
+
+import static 
org.apache.hadoop.fs.CommonConfigurationKeys.DEFAULT_HADOOP_HTTP_STATIC_USER;
+import static 
org.apache.hadoop.fs.CommonConfigurationKeys.HADOOP_HTTP_STATIC_USER;
 
 @InterfaceAudience.Private
 public class JspHelper {
   public static final String CURRENT_CONF = "current.conf";
   public static final String DELEGATION_PARAMETER_NAME = DelegationParam.NAME;
   public static final String NAMENODE_ADDRESS = "nnaddr";
-  static final String SET_DELEGATION = "&" + DELEGATION_PARAMETER_NAME +
-                                              "=";
   private static final Log LOG = LogFactory.getLog(JspHelper.class);
 
   /** Private constructor for preventing creating JspHelper object. */
-  private JspHelper() {} 
-  
-  // data structure to count number of blocks on datanodes.
-  private static class NodeRecord extends DatanodeInfo {
-    int frequency;
-
-    public NodeRecord(DatanodeInfo info, int count) {
-      super(info);
-      this.frequency = count;
-    }
-    
-    @Override
-    public boolean equals(Object obj) {
-      // Sufficient to use super equality as datanodes are uniquely identified
-      // by DatanodeID
-      return (this == obj) || super.equals(obj);
-    }
-    @Override
-    public int hashCode() {
-      // Super implementation is sufficient
-      return super.hashCode();
-    }
-  }
-
-  // compare two records based on their frequency
-  private static class NodeRecordComparator implements Comparator<NodeRecord> {
-
-    @Override
-    public int compare(NodeRecord o1, NodeRecord o2) {
-      if (o1.frequency < o2.frequency) {
-        return -1;
-      } else if (o1.frequency > o2.frequency) {
-        return 1;
-      } 
-      return 0;
-    }
-  }
-  
-  /**
-   * convenience method for canonicalizing host name.
-   * @param addr name:port or name 
-   * @return canonicalized host name
-   */
-   public static String canonicalize(String addr) {
-    // default port 1 is supplied to allow addr without port.
-    // the port will be ignored.
-    return NetUtils.createSocketAddr(addr, 1).getAddress()
-           .getCanonicalHostName();
-  }
-
-  /**
-   * A helper class that generates the correct URL for different schema.
-   *
-   */
-  public static final class Url {
-    public static String authority(String scheme, DatanodeID d) {
-      String fqdn = (d.getIpAddr() != null && !d.getIpAddr().isEmpty())?
-          canonicalize(d.getIpAddr()): 
-          d.getHostName();
-      if (scheme.equals("http")) {
-        return fqdn + ":" + d.getInfoPort();
-      } else if (scheme.equals("https")) {
-        return fqdn + ":" + d.getInfoSecurePort();
-      } else {
-        throw new IllegalArgumentException("Unknown scheme:" + scheme);
-      }
-    }
-
-    public static String url(String scheme, DatanodeID d) {
-      return scheme + "://" + authority(scheme, d);
-    }
-  }
-
-  public static DatanodeInfo bestNode(LocatedBlocks blks, Configuration conf)
-      throws IOException {
-    HashMap<DatanodeInfo, NodeRecord> map =
-      new HashMap<DatanodeInfo, NodeRecord>();
-    for (LocatedBlock block : blks.getLocatedBlocks()) {
-      DatanodeInfo[] nodes = block.getLocations();
-      for (DatanodeInfo node : nodes) {
-        NodeRecord record = map.get(node);
-        if (record == null) {
-          map.put(node, new NodeRecord(node, 1));
-        } else {
-          record.frequency++;
-        }
-      }
-    }
-    NodeRecord[] nodes = map.values().toArray(new NodeRecord[map.size()]);
-    Arrays.sort(nodes, new NodeRecordComparator());
-    return bestNode(nodes, false);
-  }
-
-  public static DatanodeInfo bestNode(LocatedBlock blk, Configuration conf)
-      throws IOException {
-    DatanodeInfo[] nodes = blk.getLocations();
-    return bestNode(nodes, true);
-  }
-
-  private static DatanodeInfo bestNode(DatanodeInfo[] nodes, boolean doRandom)
-      throws IOException {
-    if (nodes == null || nodes.length == 0) {
-      throw new IOException("No nodes contain this block");
-    }
-    int l = 0;
-    while (l < nodes.length && !nodes[l].isDecommissioned()) {
-      ++l;
-    }
-
-    if (l == 0) {
-      throw new IOException("No active nodes contain this block");
-    }
-
-    int index = doRandom ? DFSUtil.getRandom().nextInt(l) : 0;
-    return nodes[index];
-  }
-
-  public static void streamBlockInAscii(InetSocketAddress addr, String poolId,
-      long blockId, Token<BlockTokenIdentifier> blockToken, long genStamp,
-      long blockSize, long offsetIntoBlock, long chunkSizeToView,
-      JspWriter out, final Configuration conf, DFSClient.Conf dfsConf,
-      final DataEncryptionKey encryptionKey)
-          throws IOException {
-    if (chunkSizeToView == 0) return;
-    int amtToRead = (int)Math.min(chunkSizeToView, blockSize - 
offsetIntoBlock);
-      
-    BlockReader blockReader = new BlockReaderFactory(dfsConf).
-      setInetSocketAddress(addr).
-      setBlock(new ExtendedBlock(poolId, blockId, 0, genStamp)).
-      setFileName(BlockReaderFactory.getFileName(addr, poolId, blockId)).
-      setBlockToken(blockToken).
-      setStartOffset(offsetIntoBlock).
-      setLength(amtToRead).
-      setVerifyChecksum(true).
-      setClientName("JspHelper").
-      setClientCacheContext(ClientContext.getFromConf(conf)).
-      setDatanodeInfo(new DatanodeInfo(
-          new DatanodeID(addr.getAddress().getHostAddress(),
-              addr.getHostName(), poolId, addr.getPort(), 0, 0, 0))).
-      setCachingStrategy(CachingStrategy.newDefaultStrategy()).
-      setConfiguration(conf).
-      setRemotePeerFactory(new RemotePeerFactory() {
-        @Override
-        public Peer newConnectedPeer(InetSocketAddress addr)
-            throws IOException {
-          Peer peer = null;
-          Socket sock = NetUtils.getDefaultSocketFactory(conf).createSocket();
-          try {
-            sock.connect(addr, HdfsServerConstants.READ_TIMEOUT);
-            sock.setSoTimeout(HdfsServerConstants.READ_TIMEOUT);
-            peer = TcpPeerServer.peerFromSocketAndKey(sock, encryptionKey);
-          } finally {
-            if (peer == null) {
-              IOUtils.closeSocket(sock);
-            }
-          }
-          return peer;
-        }
-      }).
-      build();
-
-    final byte[] buf = new byte[amtToRead];
-    try {
-      int readOffset = 0;
-      int retries = 2;
-      while (amtToRead > 0) {
-        int numRead = amtToRead;
-        try {
-          blockReader.readFully(buf, readOffset, amtToRead);
-        } catch (IOException e) {
-          retries--;
-          if (retries == 0)
-            throw new IOException("Could not read data from datanode");
-          continue;
-        }
-        amtToRead -= numRead;
-        readOffset += numRead;
-      }
-    } finally {
-      blockReader.close();
-    }
-    out.print(HtmlQuoting.quoteHtmlChars(new String(buf, Charsets.UTF_8)));
-  }
-
-  public static void addTableHeader(JspWriter out) throws IOException {
-    out.print("<table border=\"1\""+
-              " cellpadding=\"2\" cellspacing=\"2\">");
-    out.print("<tbody>");
-  }
-  public static void addTableRow(JspWriter out, String[] columns) throws 
IOException {
-    out.print("<tr>");
-    for (int i = 0; i < columns.length; i++) {
-      out.print("<td style=\"vertical-align: 
top;\"><B>"+columns[i]+"</B><br></td>");
-    }
-    out.print("</tr>");
-  }
-  public static void addTableRow(JspWriter out, String[] columns, int row) 
throws IOException {
-    out.print("<tr>");
-      
-    for (int i = 0; i < columns.length; i++) {
-      if (row/2*2 == row) {//even
-        out.print("<td style=\"vertical-align: 
top;background-color:LightGrey;\"><B>"+columns[i]+"</B><br></td>");
-      } else {
-        out.print("<td style=\"vertical-align: 
top;background-color:LightBlue;\"><B>"+columns[i]+"</B><br></td>");
-          
-      }
-    }
-    out.print("</tr>");
-  }
-  public static void addTableFooter(JspWriter out) throws IOException {
-    out.print("</tbody></table>");
-  }
-
-  public static void sortNodeList(final List<DatanodeDescriptor> nodes,
-                           String field, String order) {
-        
-    class NodeComapare implements Comparator<DatanodeDescriptor> {
-      static final int 
-        FIELD_NAME              = 1,
-        FIELD_LAST_CONTACT      = 2,
-        FIELD_BLOCKS            = 3,
-        FIELD_CAPACITY          = 4,
-        FIELD_USED              = 5,
-        FIELD_PERCENT_USED      = 6,
-        FIELD_NONDFS_USED       = 7,
-        FIELD_REMAINING         = 8,
-        FIELD_PERCENT_REMAINING = 9,
-        FIELD_ADMIN_STATE       = 10,
-        FIELD_DECOMMISSIONED    = 11,
-        FIELD_BLOCKPOOL_USED    = 12,
-        FIELD_PERBLOCKPOOL_USED = 13,
-        FIELD_FAILED_VOLUMES    = 14,
-        SORT_ORDER_ASC          = 1,
-        SORT_ORDER_DSC          = 2;
-
-      int sortField = FIELD_NAME;
-      int sortOrder = SORT_ORDER_ASC;
-            
-      public NodeComapare(String field, String order) {
-        if (field.equals("lastcontact")) {
-          sortField = FIELD_LAST_CONTACT;
-        } else if (field.equals("capacity")) {
-          sortField = FIELD_CAPACITY;
-        } else if (field.equals("used")) {
-          sortField = FIELD_USED;
-        } else if (field.equals("nondfsused")) {
-          sortField = FIELD_NONDFS_USED;
-        } else if (field.equals("remaining")) {
-          sortField = FIELD_REMAINING;
-        } else if (field.equals("pcused")) {
-          sortField = FIELD_PERCENT_USED;
-        } else if (field.equals("pcremaining")) {
-          sortField = FIELD_PERCENT_REMAINING;
-        } else if (field.equals("blocks")) {
-          sortField = FIELD_BLOCKS;
-        } else if (field.equals("adminstate")) {
-          sortField = FIELD_ADMIN_STATE;
-        } else if (field.equals("decommissioned")) {
-          sortField = FIELD_DECOMMISSIONED;
-        } else if (field.equals("bpused")) {
-          sortField = FIELD_BLOCKPOOL_USED;
-        } else if (field.equals("pcbpused")) {
-          sortField = FIELD_PERBLOCKPOOL_USED;
-        } else if (field.equals("volfails")) {
-          sortField = FIELD_FAILED_VOLUMES;
-        } else {
-          sortField = FIELD_NAME;
-        }
-                
-        if (order.equals("DSC")) {
-          sortOrder = SORT_ORDER_DSC;
-        } else {
-          sortOrder = SORT_ORDER_ASC;
-        }
-      }
-
-      @Override
-      public int compare(DatanodeDescriptor d1,
-                         DatanodeDescriptor d2) {
-        int ret = 0;
-        switch (sortField) {
-        case FIELD_LAST_CONTACT:
-          ret = (int) (d2.getLastUpdate() - d1.getLastUpdate());
-          break;
-        case FIELD_CAPACITY:
-          long  dlong = d1.getCapacity() - d2.getCapacity();
-          ret = (dlong < 0) ? -1 : ((dlong > 0) ? 1 : 0);
-          break;
-        case FIELD_USED:
-          dlong = d1.getDfsUsed() - d2.getDfsUsed();
-          ret = (dlong < 0) ? -1 : ((dlong > 0) ? 1 : 0);
-          break;
-        case FIELD_NONDFS_USED:
-          dlong = d1.getNonDfsUsed() - d2.getNonDfsUsed();
-          ret = (dlong < 0) ? -1 : ((dlong > 0) ? 1 : 0);
-          break;
-        case FIELD_REMAINING:
-          dlong = d1.getRemaining() - d2.getRemaining();
-          ret = (dlong < 0) ? -1 : ((dlong > 0) ? 1 : 0);
-          break;
-        case FIELD_PERCENT_USED:
-          double ddbl =((d1.getDfsUsedPercent())-
-                        (d2.getDfsUsedPercent()));
-          ret = (ddbl < 0) ? -1 : ((ddbl > 0) ? 1 : 0);
-          break;
-        case FIELD_PERCENT_REMAINING:
-          ddbl =((d1.getRemainingPercent())-
-                 (d2.getRemainingPercent()));
-          ret = (ddbl < 0) ? -1 : ((ddbl > 0) ? 1 : 0);
-          break;
-        case FIELD_BLOCKS:
-          ret = d1.numBlocks() - d2.numBlocks();
-          break;
-        case FIELD_ADMIN_STATE:
-          ret = d1.getAdminState().toString().compareTo(
-              d2.getAdminState().toString());
-          break;
-        case FIELD_DECOMMISSIONED:
-          ret = DFSUtil.DECOM_COMPARATOR.compare(d1, d2);
-          break;
-        case FIELD_NAME: 
-          ret = d1.getHostName().compareTo(d2.getHostName());
-          break;
-        case FIELD_BLOCKPOOL_USED:
-          dlong = d1.getBlockPoolUsed() - d2.getBlockPoolUsed();
-          ret = (dlong < 0) ? -1 : ((dlong > 0) ? 1 : 0);
-          break;
-        case FIELD_PERBLOCKPOOL_USED:
-          ddbl = d1.getBlockPoolUsedPercent() - d2.getBlockPoolUsedPercent();
-          ret = (ddbl < 0) ? -1 : ((ddbl > 0) ? 1 : 0);
-          break;
-        case FIELD_FAILED_VOLUMES:
-          int dint = d1.getVolumeFailures() - d2.getVolumeFailures();
-          ret = (dint < 0) ? -1 : ((dint > 0) ? 1 : 0);
-          break;
-        default:
-          throw new IllegalArgumentException("Invalid sortField");
-        }
-        return (sortOrder == SORT_ORDER_DSC) ? -ret : ret;
-      }
-    }
-        
-    Collections.sort(nodes, new NodeComapare(field, order));
-  }
-
-  public static void printPathWithLinks(String dir, JspWriter out, 
-                                        int namenodeInfoPort,
-                                        String tokenString,
-                                        String nnAddress
-                                        ) throws IOException {
-    try {
-      String[] parts = dir.split(Path.SEPARATOR);
-      StringBuilder tempPath = new StringBuilder(dir.length());
-      out.print("<a href=\"browseDirectory.jsp" + "?dir="+ Path.SEPARATOR
-          + "&namenodeInfoPort=" + namenodeInfoPort
-          + getDelegationTokenUrlParam(tokenString) 
-          + getUrlParam(NAMENODE_ADDRESS, nnAddress) + "\">" + Path.SEPARATOR
-          + "</a>");
-      tempPath.append(Path.SEPARATOR);
-      for (int i = 0; i < parts.length-1; i++) {
-        if (!parts[i].equals("")) {
-          tempPath.append(parts[i]);
-          out.print("<a href=\"browseDirectory.jsp" + "?dir="
-              + HtmlQuoting.quoteHtmlChars(tempPath.toString()) + 
"&namenodeInfoPort=" + namenodeInfoPort
-              + getDelegationTokenUrlParam(tokenString)
-              + getUrlParam(NAMENODE_ADDRESS, nnAddress));
-          out.print("\">" + HtmlQuoting.quoteHtmlChars(parts[i]) + "</a>" + 
Path.SEPARATOR);
-          tempPath.append(Path.SEPARATOR);
-        }
-      }
-      if(parts.length > 0) {
-        out.print(HtmlQuoting.quoteHtmlChars(parts[parts.length-1]));
-      }
-    }
-    catch (UnsupportedEncodingException ex) {
-      ex.printStackTrace();
-    }
-  }
-
-  public static void printGotoForm(JspWriter out,
-                                   int namenodeInfoPort,
-                                   String tokenString,
-                                   String file,
-                                   String nnAddress) throws IOException {
-    out.print("<form action=\"browseDirectory.jsp\" method=\"get\" 
name=\"goto\">");
-    out.print("Goto : ");
-    out.print("<input name=\"dir\" type=\"text\" width=\"50\" id=\"dir\" 
value=\""+ HtmlQuoting.quoteHtmlChars(file)+"\"/>");
-    out.print("<input name=\"go\" type=\"submit\" value=\"go\"/>");
-    out.print("<input name=\"namenodeInfoPort\" type=\"hidden\" "
-        + "value=\"" + namenodeInfoPort  + "\"/>");
-    if (UserGroupInformation.isSecurityEnabled()) {
-      out.print("<input name=\"" + DELEGATION_PARAMETER_NAME
-          + "\" type=\"hidden\" value=\"" + tokenString + "\"/>");
-    }
-    out.print("<input name=\""+ NAMENODE_ADDRESS +"\" type=\"hidden\" "
-        + "value=\"" + nnAddress  + "\"/>");
-    out.print("</form>");
-  }
-  
-  public static void createTitle(JspWriter out, 
-                                 HttpServletRequest req, 
-                                 String  file) throws IOException{
-    if(file == null) file = "";
-    int start = Math.max(0,file.length() - 100);
-    if(start != 0)
-      file = "..." + file.substring(start, file.length());
-    out.print("<title>HDFS:" + file + "</title>");
-  }
-
-  /** Convert a String to chunk-size-to-view. */
-  public static int string2ChunkSizeToView(String s, int defaultValue) {
-    int n = s == null? 0: Integer.parseInt(s);
-    return n > 0? n: defaultValue;
-  }
-
-  /** Return a table containing version information. */
-  public static String getVersionTable() {
-    return "<div class='dfstable'><table>"       
-        + "\n  <tr><td class='col1'>Version:</td><td>" + 
VersionInfo.getVersion() + ", " + VersionInfo.getRevision() + "</td></tr>"
-        + "\n  <tr><td class='col1'>Compiled:</td><td>" + 
VersionInfo.getDate() + " by " + VersionInfo.getUser() + " from " + 
VersionInfo.getBranch() + "</td></tr>"
-        + "\n</table></div>";
-  }
-
-  /**
-   * Validate filename. 
-   * @return null if the filename is invalid.
-   *         Otherwise, return the validated filename.
-   */
-  public static String validatePath(String p) {
-    return p == null || p.length() == 0?
-        null: new Path(p).toUri().getPath();
-  }
-
-  /**
-   * Validate a long value. 
-   * @return null if the value is invalid.
-   *         Otherwise, return the validated Long object.
-   */
-  public static Long validateLong(String value) {
-    return value == null? null: Long.parseLong(value);
-  }
-
-  /**
-   * Validate a URL.
-   * @return null if the value is invalid.
-   *         Otherwise, return the validated URL String.
-   */
-  public static String validateURL(String value) {
-    try {
-      return URLEncoder.encode(new URL(value).toString(), "UTF-8");
-    } catch (IOException e) {
-      return null;
-    }
-  }
-  
-  /**
-   * If security is turned off, what is the default web user?
-   * @param conf the configuration to look in
-   * @return the remote user that was configuration
-   */
-  public static UserGroupInformation getDefaultWebUser(Configuration conf
-                                                       ) throws IOException {
-    return UserGroupInformation.createRemoteUser(getDefaultWebUserName(conf));
-  }
+  private JspHelper() {}
 
   private static String getDefaultWebUserName(Configuration conf
       ) throws IOException {
@@ -690,10 +191,10 @@ public class JspHelper {
   // honor the X-Forwarded-For header set by a configured set of trusted
   // proxy servers.  allows audit logging and proxy user checks to work
   // via an http proxy
-  static String getRemoteAddr(HttpServletRequest request) {
+  public static String getRemoteAddr(HttpServletRequest request) {
     String remoteAddr = request.getRemoteAddr();
     String proxyHeader = request.getHeader("X-Forwarded-For");
-    if (proxyHeader != null && ProxyUsers.isProxyServer(remoteAddr)) {
+    if (proxyHeader != null && ProxyServers.isProxyServer(remoteAddr)) {
       final String clientAddr = proxyHeader.split(",")[0].trim();
       if (!clientAddr.isEmpty()) {
         remoteAddr = clientAddr;
@@ -736,56 +237,4 @@ public class JspHelper {
     return username;
   }
 
-  /**
-   * Returns the url parameter for the given token string.
-   * @param tokenString
-   * @return url parameter
-   */
-  public static String getDelegationTokenUrlParam(String tokenString) {
-    if (tokenString == null ) {
-      return "";
-    }
-    if (UserGroupInformation.isSecurityEnabled()) {
-      return SET_DELEGATION + tokenString;
-    } else {
-      return "";
-    }
-  }
-
-  /**
-   * Returns the url parameter for the given string, prefixed with
-   * paramSeparator.
-   * 
-   * @param name parameter name
-   * @param val parameter value
-   * @param paramSeparator URL parameter prefix, i.e. either '?' or '&'
-   * @return url parameter
-   */
-  public static String getUrlParam(String name, String val, String 
paramSeparator) {
-    return val == null ? "" : paramSeparator + name + "=" + val;
-  }
-  
-  /**
-   * Returns the url parameter for the given string, prefixed with '?' if
-   * firstParam is true, prefixed with '&' if firstParam is false.
-   * 
-   * @param name parameter name
-   * @param val parameter value
-   * @param firstParam true if this is the first parameter in the list, false 
otherwise
-   * @return url parameter
-   */
-  public static String getUrlParam(String name, String val, boolean 
firstParam) {
-    return getUrlParam(name, val, firstParam ? "?" : "&");
-  }
-  
-  /**
-   * Returns the url parameter for the given string, prefixed with '&'.
-   * 
-   * @param name parameter name
-   * @param val parameter value
-   * @return url parameter
-   */
-  public static String getUrlParam(String name, String val) {
-    return getUrlParam(name, val, false);
-  }
 }

Modified: 
hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/Storage.java
URL: 
http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/Storage.java?rev=1619012&r1=1619011&r2=1619012&view=diff
==============================================================================
--- 
hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/Storage.java
 (original)
+++ 
hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/Storage.java
 Tue Aug 19 23:49:39 2014
@@ -831,10 +831,10 @@ public abstract class Storage extends St
   }
 
   /**
-   * Checks if the upgrade from the given old version is supported. If
-   * no upgrade is supported, it throws IncorrectVersionException.
-   * 
-   * @param oldVersion
+   * Checks if the upgrade from {@code oldVersion} is supported.
+   * @param oldVersion the version of the metadata to check with the current
+   *                   version
+   * @throws IOException if upgrade is not supported
    */
   public static void checkVersionUpgradable(int oldVersion) 
                                      throws IOException {

Modified: 
hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/StorageInfo.java
URL: 
http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/StorageInfo.java?rev=1619012&r1=1619011&r2=1619012&view=diff
==============================================================================
--- 
hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/StorageInfo.java
 (original)
+++ 
hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/StorageInfo.java
 Tue Aug 19 23:49:39 2014
@@ -148,8 +148,8 @@ public class StorageInfo {
    * Get common storage fields.
    * Should be overloaded if additional fields need to be get.
    * 
-   * @param props
-   * @throws IOException
+   * @param props properties
+   * @throws IOException on error
    */
   protected void setFieldsFromProperties(
       Properties props, StorageDirectory sd) throws IOException {

Modified: 
hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPOfferService.java
URL: 
http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPOfferService.java?rev=1619012&r1=1619011&r2=1619012&view=diff
==============================================================================
--- 
hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPOfferService.java
 (original)
+++ 
hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPOfferService.java
 Tue Aug 19 23:49:39 2014
@@ -21,6 +21,7 @@ import com.google.common.annotations.Vis
 import com.google.common.base.Preconditions;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Sets;
+
 import org.apache.commons.logging.Log;
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.ha.HAServiceProtocol.HAServiceState;
@@ -38,6 +39,8 @@ import java.util.ArrayList;
 import java.util.List;
 import java.util.Set;
 import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
 
 /**
  * One instance per block-pool/namespace on the DN, which handles the
@@ -91,6 +94,28 @@ class BPOfferService {
    */
   private long lastActiveClaimTxId = -1;
 
+  private final ReentrantReadWriteLock mReadWriteLock =
+      new ReentrantReadWriteLock();
+  private final Lock mReadLock  = mReadWriteLock.readLock();
+  private final Lock mWriteLock = mReadWriteLock.writeLock();
+
+  // utility methods to acquire and release read lock and write lock
+  void readLock() {
+    mReadLock.lock();
+  }
+
+  void readUnlock() {
+    mReadLock.unlock();
+  }
+
+  void writeLock() {
+    mWriteLock.lock();
+  }
+
+  void writeUnlock() {
+    mWriteLock.unlock();
+  }
+
   BPOfferService(List<InetSocketAddress> nnAddrs, DataNode dn) {
     Preconditions.checkArgument(!nnAddrs.isEmpty(),
         "Must pass at least one NN.");
@@ -135,38 +160,57 @@ class BPOfferService {
     }
     return false;
   }
-  
-  synchronized String getBlockPoolId() {
-    if (bpNSInfo != null) {
-      return bpNSInfo.getBlockPoolID();
-    } else {
-      LOG.warn("Block pool ID needed, but service not yet registered with NN",
-          new Exception("trace"));
-      return null;
+
+  String getBlockPoolId() {
+    readLock();
+    try {
+      if (bpNSInfo != null) {
+        return bpNSInfo.getBlockPoolID();
+      } else {
+        LOG.warn("Block pool ID needed, but service not yet registered with 
NN",
+            new Exception("trace"));
+        return null;
+      }
+    } finally {
+      readUnlock();
     }
   }
-  
-  synchronized NamespaceInfo getNamespaceInfo() {
-    return bpNSInfo;
+
+  boolean hasBlockPoolId() {
+    return getNamespaceInfo() != null;
+  }
+
+  NamespaceInfo getNamespaceInfo() {
+    readLock();
+    try {
+      return bpNSInfo;
+    } finally {
+      readUnlock();
+    }
   }
 
   @Override
-  public synchronized String toString() {
-    if (bpNSInfo == null) {
-      // If we haven't yet connected to our NN, we don't yet know our
-      // own block pool ID.
-      // If _none_ of the block pools have connected yet, we don't even
-      // know the DatanodeID ID of this DN.
-      String datanodeUuid = dn.getDatanodeUuid();
+  public String toString() {
+    readLock();
+    try {
+      if (bpNSInfo == null) {
+        // If we haven't yet connected to our NN, we don't yet know our
+        // own block pool ID.
+        // If _none_ of the block pools have connected yet, we don't even
+        // know the DatanodeID ID of this DN.
+        String datanodeUuid = dn.getDatanodeUuid();
 
-      if (datanodeUuid == null || datanodeUuid.isEmpty()) {
-        datanodeUuid = "unassigned";
+        if (datanodeUuid == null || datanodeUuid.isEmpty()) {
+          datanodeUuid = "unassigned";
+        }
+        return "Block pool <registering> (Datanode Uuid " + datanodeUuid + ")";
+      } else {
+        return "Block pool " + getBlockPoolId() +
+            " (Datanode Uuid " + dn.getDatanodeUuid() +
+            ")";
       }
-      return "Block pool <registering> (Datanode Uuid " + datanodeUuid + ")";
-    } else {
-      return "Block pool " + getBlockPoolId() +
-          " (Datanode Uuid " + dn.getDatanodeUuid() +
-          ")";
+    } finally {
+      readUnlock();
     }
   }
   
@@ -262,32 +306,37 @@ class BPOfferService {
    * verifies that this namespace matches (eg to prevent a misconfiguration
    * where a StandbyNode from a different cluster is specified)
    */
-  synchronized void verifyAndSetNamespaceInfo(NamespaceInfo nsInfo) throws 
IOException {
-    if (this.bpNSInfo == null) {
-      this.bpNSInfo = nsInfo;
-      boolean success = false;
-
-      // Now that we know the namespace ID, etc, we can pass this to the DN.
-      // The DN can now initialize its local storage if we are the
-      // first BP to handshake, etc.
-      try {
-        dn.initBlockPool(this);
-        success = true;
-      } finally {
-        if (!success) {
-          // The datanode failed to initialize the BP. We need to reset
-          // the namespace info so that other BPService actors still have
-          // a chance to set it, and re-initialize the datanode.
-          this.bpNSInfo = null;
+  void verifyAndSetNamespaceInfo(NamespaceInfo nsInfo) throws IOException {
+    writeLock();
+    try {
+      if (this.bpNSInfo == null) {
+        this.bpNSInfo = nsInfo;
+        boolean success = false;
+
+        // Now that we know the namespace ID, etc, we can pass this to the DN.
+        // The DN can now initialize its local storage if we are the
+        // first BP to handshake, etc.
+        try {
+          dn.initBlockPool(this);
+          success = true;
+        } finally {
+          if (!success) {
+            // The datanode failed to initialize the BP. We need to reset
+            // the namespace info so that other BPService actors still have
+            // a chance to set it, and re-initialize the datanode.
+            this.bpNSInfo = null;
+          }
         }
+      } else {
+        checkNSEquality(bpNSInfo.getBlockPoolID(), nsInfo.getBlockPoolID(),
+            "Blockpool ID");
+        checkNSEquality(bpNSInfo.getNamespaceID(), nsInfo.getNamespaceID(),
+            "Namespace ID");
+        checkNSEquality(bpNSInfo.getClusterID(), nsInfo.getClusterID(),
+            "Cluster ID");
       }
-    } else {
-      checkNSEquality(bpNSInfo.getBlockPoolID(), nsInfo.getBlockPoolID(),
-          "Blockpool ID");
-      checkNSEquality(bpNSInfo.getNamespaceID(), nsInfo.getNamespaceID(),
-          "Namespace ID");
-      checkNSEquality(bpNSInfo.getClusterID(), nsInfo.getClusterID(),
-          "Cluster ID");
+    } finally {
+      writeUnlock();
     }
   }
 
@@ -296,22 +345,27 @@ class BPOfferService {
    * NN, it calls this function to verify that the NN it connected to
    * is consistent with other NNs serving the block-pool.
    */
-  synchronized void registrationSucceeded(BPServiceActor bpServiceActor,
+  void registrationSucceeded(BPServiceActor bpServiceActor,
       DatanodeRegistration reg) throws IOException {
-    if (bpRegistration != null) {
-      checkNSEquality(bpRegistration.getStorageInfo().getNamespaceID(),
-          reg.getStorageInfo().getNamespaceID(), "namespace ID");
-      checkNSEquality(bpRegistration.getStorageInfo().getClusterID(),
-          reg.getStorageInfo().getClusterID(), "cluster ID");
-    } else {
-      bpRegistration = reg;
-    }
-    
-    dn.bpRegistrationSucceeded(bpRegistration, getBlockPoolId());
-    // Add the initial block token secret keys to the DN's secret manager.
-    if (dn.isBlockTokenEnabled) {
-      dn.blockPoolTokenSecretManager.addKeys(getBlockPoolId(),
-          reg.getExportedKeys());
+    writeLock();
+    try {
+      if (bpRegistration != null) {
+        checkNSEquality(bpRegistration.getStorageInfo().getNamespaceID(),
+            reg.getStorageInfo().getNamespaceID(), "namespace ID");
+        checkNSEquality(bpRegistration.getStorageInfo().getClusterID(),
+            reg.getStorageInfo().getClusterID(), "cluster ID");
+      } else {
+        bpRegistration = reg;
+      }
+
+      dn.bpRegistrationSucceeded(bpRegistration, getBlockPoolId());
+      // Add the initial block token secret keys to the DN's secret manager.
+      if (dn.isBlockTokenEnabled) {
+        dn.blockPoolTokenSecretManager.addKeys(getBlockPoolId(),
+            reg.getExportedKeys());
+      }
+    } finally {
+      writeUnlock();
     }
   }
 
@@ -329,25 +383,35 @@ class BPOfferService {
     }
   }
 
-  synchronized DatanodeRegistration createRegistration() {
-    Preconditions.checkState(bpNSInfo != null,
-        "getRegistration() can only be called after initial handshake");
-    return dn.createBPRegistration(bpNSInfo);
+  DatanodeRegistration createRegistration() {
+    writeLock();
+    try {
+      Preconditions.checkState(bpNSInfo != null,
+          "getRegistration() can only be called after initial handshake");
+      return dn.createBPRegistration(bpNSInfo);
+    } finally {
+      writeUnlock();
+    }
   }
 
   /**
    * Called when an actor shuts down. If this is the last actor
    * to shut down, shuts down the whole blockpool in the DN.
    */
-  synchronized void shutdownActor(BPServiceActor actor) {
-    if (bpServiceToActive == actor) {
-      bpServiceToActive = null;
-    }
+  void shutdownActor(BPServiceActor actor) {
+    writeLock();
+    try {
+      if (bpServiceToActive == actor) {
+        bpServiceToActive = null;
+      }
 
-    bpServices.remove(actor);
+      bpServices.remove(actor);
 
-    if (bpServices.isEmpty()) {
-      dn.shutdownBlockPool(this);
+      if (bpServices.isEmpty()) {
+        dn.shutdownBlockPool(this);
+      }
+    } finally {
+      writeUnlock();
     }
   }
 
@@ -388,11 +452,16 @@ class BPOfferService {
    * @return a proxy to the active NN, or null if the BPOS has not
    * acknowledged any NN as active yet.
    */
-  synchronized DatanodeProtocolClientSideTranslatorPB getActiveNN() {
-    if (bpServiceToActive != null) {
-      return bpServiceToActive.bpNamenode;
-    } else {
-      return null;
+  DatanodeProtocolClientSideTranslatorPB getActiveNN() {
+    readLock();
+    try {
+      if (bpServiceToActive != null) {
+        return bpServiceToActive.bpNamenode;
+      } else {
+        return null;
+      }
+    } finally {
+      readUnlock();
     }
   }
 
@@ -420,45 +489,50 @@ class BPOfferService {
    * @param actor the actor which received the heartbeat
    * @param nnHaState the HA-related heartbeat contents
    */
-  synchronized void updateActorStatesFromHeartbeat(
+  void updateActorStatesFromHeartbeat(
       BPServiceActor actor,
       NNHAStatusHeartbeat nnHaState) {
-    final long txid = nnHaState.getTxId();
-    
-    final boolean nnClaimsActive =
-      nnHaState.getState() == HAServiceState.ACTIVE;
-    final boolean bposThinksActive = bpServiceToActive == actor;
-    final boolean isMoreRecentClaim = txid > lastActiveClaimTxId; 
-    
-    if (nnClaimsActive && !bposThinksActive) {
-      LOG.info("Namenode " + actor + " trying to claim ACTIVE state with " +
-          "txid=" + txid);
-      if (!isMoreRecentClaim) {
-        // Split-brain scenario - an NN is trying to claim active
-        // state when a different NN has already claimed it with a higher
-        // txid.
-        LOG.warn("NN " + actor + " tried to claim ACTIVE state at txid=" +
-            txid + " but there was already a more recent claim at txid=" +
-            lastActiveClaimTxId);
-        return;
-      } else {
-        if (bpServiceToActive == null) {
-          LOG.info("Acknowledging ACTIVE Namenode " + actor);
+    writeLock();
+    try {
+      final long txid = nnHaState.getTxId();
+
+      final boolean nnClaimsActive =
+          nnHaState.getState() == HAServiceState.ACTIVE;
+      final boolean bposThinksActive = bpServiceToActive == actor;
+      final boolean isMoreRecentClaim = txid > lastActiveClaimTxId;
+
+      if (nnClaimsActive && !bposThinksActive) {
+        LOG.info("Namenode " + actor + " trying to claim ACTIVE state with " +
+            "txid=" + txid);
+        if (!isMoreRecentClaim) {
+          // Split-brain scenario - an NN is trying to claim active
+          // state when a different NN has already claimed it with a higher
+          // txid.
+          LOG.warn("NN " + actor + " tried to claim ACTIVE state at txid=" +
+              txid + " but there was already a more recent claim at txid=" +
+              lastActiveClaimTxId);
+          return;
         } else {
-          LOG.info("Namenode " + actor + " taking over ACTIVE state from " +
-              bpServiceToActive + " at higher txid=" + txid);
+          if (bpServiceToActive == null) {
+            LOG.info("Acknowledging ACTIVE Namenode " + actor);
+          } else {
+            LOG.info("Namenode " + actor + " taking over ACTIVE state from " +
+                bpServiceToActive + " at higher txid=" + txid);
+          }
+          bpServiceToActive = actor;
         }
-        bpServiceToActive = actor;
+      } else if (!nnClaimsActive && bposThinksActive) {
+        LOG.info("Namenode " + actor + " relinquishing ACTIVE state with " +
+            "txid=" + nnHaState.getTxId());
+        bpServiceToActive = null;
       }
-    } else if (!nnClaimsActive && bposThinksActive) {
-      LOG.info("Namenode " + actor + " relinquishing ACTIVE state with " +
-          "txid=" + nnHaState.getTxId());
-      bpServiceToActive = null;
-    }
-    
-    if (bpServiceToActive == actor) {
-      assert txid >= lastActiveClaimTxId;
-      lastActiveClaimTxId = txid;
+
+      if (bpServiceToActive == actor) {
+        assert txid >= lastActiveClaimTxId;
+        lastActiveClaimTxId = txid;
+      }
+    } finally {
+      writeUnlock();
     }
   }
 
@@ -527,14 +601,17 @@ class BPOfferService {
       LOG.info("DatanodeCommand action : DNA_REGISTER from " + actor.nnAddr
           + " with " + actor.state + " state");
       actor.reRegister();
-      return true;
+      return false;
     }
-    synchronized (this) {
+    writeLock();
+    try {
       if (actor == bpServiceToActive) {
         return processCommandFromActive(cmd, actor);
       } else {
         return processCommandFromStandby(cmd, actor);
       }
+    } finally {
+      writeUnlock();
     }
   }
 
@@ -571,7 +648,8 @@ class BPOfferService {
     switch(cmd.getAction()) {
     case DatanodeProtocol.DNA_TRANSFER:
       // Send a copy of a block to another datanode
-      dn.transferBlocks(bcmd.getBlockPoolId(), bcmd.getBlocks(), 
bcmd.getTargets());
+      dn.transferBlocks(bcmd.getBlockPoolId(), bcmd.getBlocks(),
+          bcmd.getTargets(), bcmd.getTargetStorageTypes());
       dn.metrics.incrBlocksReplicated(bcmd.getBlocks().length);
       break;
     case DatanodeProtocol.DNA_INVALIDATE:
@@ -679,4 +757,17 @@ class BPOfferService {
     return true;
   }
 
+  /*
+   * Let the actor retry for initialization until all namenodes of cluster have
+   * failed.
+   */
+  boolean shouldRetryInit() {
+    if (hasBlockPoolId()) {
+      // One of the namenode registered successfully. lets continue retry for
+      // other.
+      return true;
+    }
+    return isAlive();
+  }
+
 }

Modified: 
hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPServiceActor.java
URL: 
http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPServiceActor.java?rev=1619012&r1=1619011&r2=1619012&view=diff
==============================================================================
--- 
hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPServiceActor.java
 (original)
+++ 
hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPServiceActor.java
 Tue Aug 19 23:49:39 2014
@@ -90,8 +90,13 @@ class BPServiceActor implements Runnable
   Thread bpThread;
   DatanodeProtocolClientSideTranslatorPB bpNamenode;
   private volatile long lastHeartbeat = 0;
-  private volatile boolean initialized = false;
-  
+
+  static enum RunningState {
+    CONNECTING, INIT_FAILED, RUNNING, EXITED, FAILED;
+  }
+
+  private volatile RunningState runningState = RunningState.CONNECTING;
+
   /**
    * Between block reports (which happen on the order of once an hour) the
    * DN reports smaller incremental changes to its block list. This map,
@@ -118,17 +123,12 @@ class BPServiceActor implements Runnable
     this.dnConf = dn.getDnConf();
   }
 
-  /**
-   * returns true if BP thread has completed initialization of storage
-   * and has registered with the corresponding namenode
-   * @return true if initialized
-   */
-  boolean isInitialized() {
-    return initialized;
-  }
-  
   boolean isAlive() {
-    return shouldServiceRun && bpThread.isAlive();
+    if (!shouldServiceRun || !bpThread.isAlive()) {
+      return false;
+    }
+    return runningState == BPServiceActor.RunningState.RUNNING
+        || runningState == BPServiceActor.RunningState.CONNECTING;
   }
 
   @Override
@@ -222,7 +222,19 @@ class BPServiceActor implements Runnable
     // Second phase of the handshake with the NN.
     register();
   }
-  
+
+  // This is useful to make sure NN gets Heartbeat before Blockreport
+  // upon NN restart while DN keeps retrying Otherwise,
+  // 1. NN restarts.
+  // 2. Heartbeat RPC will retry and succeed. NN asks DN to reregister.
+  // 3. After reregistration completes, DN will send Blockreport first.
+  // 4. Given NN receives Blockreport after Heartbeat, it won't mark
+  //    DatanodeStorageInfo#blockContentsStale to false until the next
+  //    Blockreport.
+  void scheduleHeartbeat() {
+    lastHeartbeat = 0;
+  }
+
   /**
    * This methods  arranges for the data node to send the block report at 
    * the next heartbeat.
@@ -314,9 +326,7 @@ class BPServiceActor implements Runnable
   }
 
   /**
-   * Retrieve the incremental BR state for a given storage UUID
-   * @param storageUuid
-   * @return
+   * @return pending incremental block report for given {@code storage}
    */
   private PerStoragePendingIncrementalBR getIncrementalBRMapForStorage(
       DatanodeStorage storage) {
@@ -339,8 +349,6 @@ class BPServiceActor implements Runnable
    * exists for the same block it is removed.
    *
    * Caller must synchronize access using pendingIncrementalBRperStorage.
-   * @param bInfo
-   * @param storageUuid
    */
   void addPendingReplicationBlockInfo(ReceivedDeletedBlockInfo bInfo,
       DatanodeStorage storage) {
@@ -809,19 +817,30 @@ class BPServiceActor implements Runnable
     LOG.info(this + " starting to offer service");
 
     try {
-      // init stuff
-      try {
-        // setup storage
-        connectToNNAndHandshake();
-      } catch (IOException ioe) {
-        // Initial handshake, storage recovery or registration failed
-        // End BPOfferService thread
-        LOG.fatal("Initialization failed for block pool " + this, ioe);
-        return;
+      while (true) {
+        // init stuff
+        try {
+          // setup storage
+          connectToNNAndHandshake();
+          break;
+        } catch (IOException ioe) {
+          // Initial handshake, storage recovery or registration failed
+          runningState = RunningState.INIT_FAILED;
+          if (shouldRetryInit()) {
+            // Retry until all namenode's of BPOS failed initialization
+            LOG.error("Initialization failed for " + this + " "
+                + ioe.getLocalizedMessage());
+            sleepAndLogInterrupts(5000, "initializing");
+          } else {
+            runningState = RunningState.FAILED;
+            LOG.fatal("Initialization failed for " + this + ". Exiting. ", 
ioe);
+            return;
+          }
+        }
       }
 
-      initialized = true; // bp is initialized;
-      
+      runningState = RunningState.RUNNING;
+
       while (shouldRun()) {
         try {
           offerService();
@@ -830,14 +849,20 @@ class BPServiceActor implements Runnable
           sleepAndLogInterrupts(5000, "offering service");
         }
       }
+      runningState = RunningState.EXITED;
     } catch (Throwable ex) {
       LOG.warn("Unexpected exception in block pool " + this, ex);
+      runningState = RunningState.FAILED;
     } finally {
       LOG.warn("Ending block pool service for: " + this);
       cleanUp();
     }
   }
 
+  private boolean shouldRetryInit() {
+    return shouldRun() && bpos.shouldRetryInit();
+  }
+
   private boolean shouldRun() {
     return shouldServiceRun && dn.shouldRun();
   }
@@ -889,6 +914,7 @@ class BPServiceActor implements Runnable
       retrieveNamespaceInfo();
       // and re-register
       register();
+      scheduleHeartbeat();
     }
   }
 


Reply via email to