Modified: 
hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSUtil.java
URL: 
http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSUtil.java?rev=1616428&r1=1616427&r2=1616428&view=diff
==============================================================================
--- 
hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSUtil.java
 (original)
+++ 
hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSUtil.java
 Thu Aug  7 07:38:23 2014
@@ -33,6 +33,9 @@ import static org.apache.hadoop.hdfs.DFS
 import static 
org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_SERVICE_RPC_ADDRESS_KEY;
 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMESERVICES;
 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMESERVICE_ID;
+import static 
org.apache.hadoop.hdfs.DFSConfigKeys.DFS_SERVER_HTTPS_KEYPASSWORD_KEY;
+import static 
org.apache.hadoop.hdfs.DFSConfigKeys.DFS_SERVER_HTTPS_KEYSTORE_PASSWORD_KEY;
+import static 
org.apache.hadoop.hdfs.DFSConfigKeys.DFS_SERVER_HTTPS_TRUSTSTORE_PASSWORD_KEY;
 
 import java.io.IOException;
 import java.io.PrintStream;
@@ -1531,16 +1534,38 @@ public class DFSUtil {
         .needsClientAuth(
             sslConf.getBoolean(DFS_CLIENT_HTTPS_NEED_AUTH_KEY,
                 DFS_CLIENT_HTTPS_NEED_AUTH_DEFAULT))
-        .keyPassword(sslConf.get("ssl.server.keystore.keypassword"))
+        .keyPassword(getPassword(sslConf, DFS_SERVER_HTTPS_KEYPASSWORD_KEY))
         .keyStore(sslConf.get("ssl.server.keystore.location"),
-            sslConf.get("ssl.server.keystore.password"),
+            getPassword(sslConf, DFS_SERVER_HTTPS_KEYSTORE_PASSWORD_KEY),
             sslConf.get("ssl.server.keystore.type", "jks"))
         .trustStore(sslConf.get("ssl.server.truststore.location"),
-            sslConf.get("ssl.server.truststore.password"),
+            getPassword(sslConf, DFS_SERVER_HTTPS_TRUSTSTORE_PASSWORD_KEY),
             sslConf.get("ssl.server.truststore.type", "jks"));
   }
 
   /**
+   * Leverages the Configuration.getPassword method to attempt to get
+   * passwords from the CredentialProvider API before falling back to
+   * clear text in config - if falling back is allowed.
+   * @param conf Configuration instance
+   * @param alias name of the credential to retreive
+   * @return String credential value or null
+   */
+  static String getPassword(Configuration conf, String alias) {
+    String password = null;
+    try {
+      char[] passchars = conf.getPassword(alias);
+      if (passchars != null) {
+        password = new String(passchars);
+      }
+    }
+    catch (IOException ioe) {
+      password = null;
+    }
+    return password;
+  }
+
+  /**
    * Converts a Date into an ISO-8601 formatted datetime string.
    */
   public static String dateToIso8601String(Date date) {

Modified: 
hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/Block.java
URL: 
http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/Block.java?rev=1616428&r1=1616427&r2=1616428&view=diff
==============================================================================
--- 
hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/Block.java
 (original)
+++ 
hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/Block.java
 Thu Aug  7 07:38:23 2014
@@ -50,6 +50,9 @@ public class Block implements Writable, 
   public static final Pattern metaFilePattern = Pattern
       .compile(BLOCK_FILE_PREFIX + "(-??\\d++)_(\\d++)\\" + METADATA_EXTENSION
           + "$");
+  public static final Pattern metaOrBlockFilePattern = Pattern
+      .compile(BLOCK_FILE_PREFIX + "(-??\\d++)(_(\\d++)\\" + METADATA_EXTENSION
+          + ")?$");
 
   public static boolean isBlockFilename(File f) {
     String name = f.getName();
@@ -65,6 +68,11 @@ public class Block implements Writable, 
     return metaFilePattern.matcher(name).matches();
   }
 
+  public static File metaToBlockFile(File metaFile) {
+    return new File(metaFile.getParent(), metaFile.getName().substring(
+        0, metaFile.getName().lastIndexOf('_')));
+  }
+
   /**
    * Get generation stamp from the name of the metafile name
    */
@@ -75,10 +83,10 @@ public class Block implements Writable, 
   }
 
   /**
-   * Get the blockId from the name of the metafile name
+   * Get the blockId from the name of the meta or block file
    */
-  public static long getBlockId(String metaFile) {
-    Matcher m = metaFilePattern.matcher(metaFile);
+  public static long getBlockId(String metaOrBlockFile) {
+    Matcher m = metaOrBlockFilePattern.matcher(metaOrBlockFile);
     return m.matches() ? Long.parseLong(m.group(1)) : 0;
   }
 

Modified: 
hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Balancer.java
URL: 
http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Balancer.java?rev=1616428&r1=1616427&r2=1616428&view=diff
==============================================================================
--- 
hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Balancer.java
 (original)
+++ 
hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Balancer.java
 Thu Aug  7 07:38:23 2014
@@ -58,6 +58,7 @@ import org.apache.commons.logging.LogFac
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.conf.Configured;
+import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hdfs.DFSConfigKeys;
 import org.apache.hadoop.hdfs.DFSUtil;
 import org.apache.hadoop.hdfs.HdfsConfiguration;
@@ -85,7 +86,6 @@ import org.apache.hadoop.hdfs.server.pro
 import org.apache.hadoop.io.IOUtils;
 import org.apache.hadoop.net.NetUtils;
 import org.apache.hadoop.net.NetworkTopology;
-import org.apache.hadoop.net.Node;
 import org.apache.hadoop.security.token.Token;
 import org.apache.hadoop.util.HostsFileReader;
 import org.apache.hadoop.util.StringUtils;
@@ -195,10 +195,12 @@ import com.google.common.base.Preconditi
 @InterfaceAudience.Private
 public class Balancer {
   static final Log LOG = LogFactory.getLog(Balancer.class);
-  final private static long GB = 1L << 30; //1GB
-  final private static long MAX_SIZE_TO_MOVE = 10*GB;
-  final private static long MAX_BLOCKS_SIZE_TO_FETCH = 2*GB;
-  private static long WIN_WIDTH = 5400*1000L; // 1.5 hour
+
+  private static final Path BALANCER_ID_PATH = new Path("/system/balancer.id");
+
+  private static final long GB = 1L << 30; //1GB
+  private static final long MAX_SIZE_TO_MOVE = 10*GB;
+  private static final long MAX_BLOCKS_SIZE_TO_FETCH = 2*GB;
 
   /** The maximum number of concurrent blocks moves for 
    * balancing purpose at a datanode
@@ -219,6 +221,8 @@ public class Balancer {
       + "\tIncludes only the specified datanodes.";
   
   private final NameNodeConnector nnc;
+  private final KeyManager keyManager;
+
   private final BalancingPolicy policy;
   private final SaslDataTransferClient saslClient;
   private final double threshold;
@@ -241,7 +245,8 @@ public class Balancer {
   
   private final Map<Block, BalancerBlock> globalBlockList
                  = new HashMap<Block, BalancerBlock>();
-  private final MovedBlocks movedBlocks = new MovedBlocks();
+  private final MovedBlocks<BalancerDatanode.StorageGroup> movedBlocks;
+
   /** Map (datanodeUuid,storageType -> StorageGroup) */
   private final StorageGroupMap storageGroupMap = new StorageGroupMap();
   
@@ -326,7 +331,7 @@ public class Balancer {
           if (isGoodBlockCandidate(source, target, block)) {
             this.block = block;
             if ( chooseProxySource() ) {
-              movedBlocks.add(block);
+              movedBlocks.put(block);
               if (LOG.isDebugEnabled()) {
                 LOG.debug("Decided to move " + this);
               }
@@ -399,10 +404,10 @@ public class Balancer {
         
         OutputStream unbufOut = sock.getOutputStream();
         InputStream unbufIn = sock.getInputStream();
-        ExtendedBlock eb = new ExtendedBlock(nnc.blockpoolID, 
block.getBlock());
-        Token<BlockTokenIdentifier> accessToken = nnc.getAccessToken(eb);
+        ExtendedBlock eb = new ExtendedBlock(nnc.getBlockpoolID(), 
block.getBlock());
+        Token<BlockTokenIdentifier> accessToken = 
keyManager.getAccessToken(eb);
         IOStreamPair saslStreams = saslClient.socketSend(sock, unbufOut,
-          unbufIn, nnc, accessToken, target.getDatanode());
+          unbufIn, keyManager, accessToken, target.getDatanode());
         unbufOut = saslStreams.out;
         unbufIn = saslStreams.in;
         out = new DataOutputStream(new BufferedOutputStream(unbufOut,
@@ -483,47 +488,9 @@ public class Balancer {
   }
   
   /* A class for keeping track of blocks in the Balancer */
-  static private class BalancerBlock {
-    private final Block block; // the block
-    /** The locations of the replicas of the block. */
-    private final List<BalancerDatanode.StorageGroup> locations
-        = new ArrayList<BalancerDatanode.StorageGroup>(3);
-    
-    /* Constructor */
-    private BalancerBlock(Block block) {
-      this.block = block;
-    }
-    
-    /* clean block locations */
-    private synchronized void clearLocations() {
-      locations.clear();
-    }
-    
-    /* add a location */
-    private synchronized void addLocation(BalancerDatanode.StorageGroup g) {
-      if (!locations.contains(g)) {
-        locations.add(g);
-      }
-    }
-    
-    /** @return if the block is located on the given storage group. */
-    private synchronized boolean isLocatedOn(BalancerDatanode.StorageGroup g) {
-      return locations.contains(g);
-    }
-    
-    /* Return its locations */
-    private synchronized List<BalancerDatanode.StorageGroup> getLocations() {
-      return locations;
-    }
-    
-    /* Return the block */
-    private Block getBlock() {
-      return block;
-    }
-    
-    /* Return the length of the block */
-    private long getNumBytes() {
-      return block.getNumBytes();
+  static class BalancerBlock extends 
MovedBlocks.Locations<BalancerDatanode.StorageGroup> {
+    BalancerBlock(Block block) {
+      super(block);
     }
   }
   
@@ -735,7 +702,7 @@ public class Balancer {
      */
     private long getBlockList() throws IOException {
       final long size = Math.min(MAX_BLOCKS_SIZE_TO_FETCH, blocksToReceive);
-      final BlockWithLocations[] newBlocks = nnc.namenode.getBlocks(
+      final BlockWithLocations[] newBlocks = nnc.getNamenode().getBlocks(
           getDatanode(), size).getBlocks();
 
       long bytesReceived = 0;
@@ -819,7 +786,7 @@ public class Balancer {
     private void filterMovedBlocks() {
       for (Iterator<BalancerBlock> blocks=getBlockIterator();
             blocks.hasNext();) {
-        if (movedBlocks.contains(blocks.next())) {
+        if (movedBlocks.contains(blocks.next().getBlock())) {
           blocks.remove();
         }
       }
@@ -925,6 +892,13 @@ public class Balancer {
     this.nodesToBeExcluded = p.nodesToBeExcluded;
     this.nodesToBeIncluded = p.nodesToBeIncluded;
     this.nnc = theblockpool;
+    this.keyManager = nnc.getKeyManager();
+    
+    final long movedWinWidth = conf.getLong(
+        DFSConfigKeys.DFS_BALANCER_MOVEDWINWIDTH_KEY, 
+        DFSConfigKeys.DFS_BALANCER_MOVEDWINWIDTH_DEFAULT);
+    movedBlocks = new 
MovedBlocks<BalancerDatanode.StorageGroup>(movedWinWidth);
+
     cluster = NetworkTopology.getInstance(conf);
 
     this.moverExecutor = Executors.newFixedThreadPool(
@@ -1094,36 +1068,6 @@ public class Balancer {
     LOG.info(items.size() + " " + name + ": " + items);
   }
 
-  /** A matcher interface for matching nodes. */
-  private interface Matcher {
-    /** Given the cluster topology, does the left node match the right node? */
-    boolean match(NetworkTopology cluster, Node left,  Node right);
-  }
-
-  /** Match datanodes in the same node group. */
-  static final Matcher SAME_NODE_GROUP = new Matcher() {
-    @Override
-    public boolean match(NetworkTopology cluster, Node left, Node right) {
-      return cluster.isOnSameNodeGroup(left, right);
-    }
-  };
-
-  /** Match datanodes in the same rack. */
-  static final Matcher SAME_RACK = new Matcher() {
-    @Override
-    public boolean match(NetworkTopology cluster, Node left, Node right) {
-      return cluster.isOnSameRack(left, right);
-    }
-  };
-
-  /** Match any datanode with any other datanode. */
-  static final Matcher ANY_OTHER = new Matcher() {
-    @Override
-    public boolean match(NetworkTopology cluster, Node left, Node right) {
-      return left != right;
-    }
-  };
-
   /**
    * Decide all <source, target> pairs and
    * the number of bytes to move from a source to a target
@@ -1134,13 +1078,13 @@ public class Balancer {
   private long chooseStorageGroups() {
     // First, match nodes on the same node group if cluster is node group aware
     if (cluster.isNodeGroupAware()) {
-      chooseStorageGroups(SAME_NODE_GROUP);
+      chooseStorageGroups(Matcher.SAME_NODE_GROUP);
     }
     
     // Then, match nodes on the same rack
-    chooseStorageGroups(SAME_RACK);
+    chooseStorageGroups(Matcher.SAME_RACK);
     // At last, match all remaining nodes
-    chooseStorageGroups(ANY_OTHER);
+    chooseStorageGroups(Matcher.ANY_OTHER);
     
     Preconditions.checkState(storageGroupMap.size() >= sources.size() + 
targets.size(),
         "Mismatched number of datanodes (" + storageGroupMap.size() + " < "
@@ -1307,56 +1251,6 @@ public class Balancer {
     } while (shouldWait);
   }
 
-  /** This window makes sure to keep blocks that have been moved within 1.5 
hour.
-   * Old window has blocks that are older;
-   * Current window has blocks that are more recent;
-   * Cleanup method triggers the check if blocks in the old window are
-   * more than 1.5 hour old. If yes, purge the old window and then
-   * move blocks in current window to old window.
-   */ 
-  private static class MovedBlocks {
-    private long lastCleanupTime = Time.now();
-    final private static int CUR_WIN = 0;
-    final private static int OLD_WIN = 1;
-    final private static int NUM_WINS = 2;
-    final private List<HashMap<Block, BalancerBlock>> movedBlocks = 
-      new ArrayList<HashMap<Block, BalancerBlock>>(NUM_WINS);
-    
-    /* initialize the moved blocks collection */
-    private MovedBlocks() {
-      movedBlocks.add(new HashMap<Block,BalancerBlock>());
-      movedBlocks.add(new HashMap<Block,BalancerBlock>());
-    }
-
-    /* add a block thus marking a block to be moved */
-    synchronized private void add(BalancerBlock block) {
-      movedBlocks.get(CUR_WIN).put(block.getBlock(), block);
-    }
-
-    /* check if a block is marked as moved */
-    synchronized private boolean contains(BalancerBlock block) {
-      return contains(block.getBlock());
-    }
-
-    /* check if a block is marked as moved */
-    synchronized private boolean contains(Block block) {
-      return movedBlocks.get(CUR_WIN).containsKey(block) ||
-        movedBlocks.get(OLD_WIN).containsKey(block);
-    }
-
-    /* remove old blocks */
-    synchronized private void cleanup() {
-      long curTime = Time.now();
-      // check if old win is older than winWidth
-      if (lastCleanupTime + WIN_WIDTH <= curTime) {
-        // purge the old window
-        movedBlocks.set(OLD_WIN, movedBlocks.get(CUR_WIN));
-        movedBlocks.set(CUR_WIN, new HashMap<Block, BalancerBlock>());
-        lastCleanupTime = curTime;
-      }
-    }
-  }
-
   /* Decide if it is OK to move the given block from source to target
    * A block is a good candidate if
    * 1. the block is not in the process of being moved/has not been moved;
@@ -1369,7 +1263,7 @@ public class Balancer {
       return false;
     }
     // check if the block is moved or not
-    if (movedBlocks.contains(block)) {
+    if (movedBlocks.contains(block.getBlock())) {
       return false;
     }
     if (block.isLocatedOn(target)) {
@@ -1387,7 +1281,7 @@ public class Balancer {
     } else {
       boolean notOnSameRack = true;
       synchronized (block) {
-        for (BalancerDatanode.StorageGroup loc : block.locations) {
+        for (BalancerDatanode.StorageGroup loc : block.getLocations()) {
           if (cluster.isOnSameRack(loc.getDatanode(), target.getDatanode())) {
             notOnSameRack = false;
             break;
@@ -1399,7 +1293,7 @@ public class Balancer {
         goodBlock = true;
       } else {
         // good if source is on the same rack as on of the replicas
-        for (BalancerDatanode.StorageGroup loc : block.locations) {
+        for (BalancerDatanode.StorageGroup loc : block.getLocations()) {
           if (loc != source && 
               cluster.isOnSameRack(loc.getDatanode(), source.getDatanode())) {
             goodBlock = true;
@@ -1425,7 +1319,7 @@ public class Balancer {
   private boolean isOnSameNodeGroupWithReplicas(BalancerDatanode.StorageGroup 
target,
       BalancerBlock block, Source source) {
     final DatanodeInfo targetDn = target.getDatanode();
-    for (BalancerDatanode.StorageGroup loc : block.locations) {
+    for (BalancerDatanode.StorageGroup loc : block.getLocations()) {
       if (loc != source && 
           cluster.isOnSameNodeGroup(loc.getDatanode(), targetDn)) {
         return true;
@@ -1489,7 +1383,7 @@ public class Balancer {
        * decide the number of bytes need to be moved
        */
       final long bytesLeftToMove = init(
-          nnc.client.getDatanodeStorageReport(DatanodeReportType.LIVE));
+          nnc.getClient().getDatanodeStorageReport(DatanodeReportType.LIVE));
       if (bytesLeftToMove == 0) {
         System.out.println("The cluster is balanced. Exiting...");
         return ReturnStatus.SUCCESS;
@@ -1558,8 +1452,8 @@ public class Balancer {
     final long sleeptime = 2000*conf.getLong(
         DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_KEY,
         DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_DEFAULT);
-    LOG.info("namenodes = " + namenodes);
-    LOG.info("p         = " + p);
+    LOG.info("namenodes  = " + namenodes);
+    LOG.info("parameters = " + p);
     
     final Formatter formatter = new Formatter(System.out);
     System.out.println("Time Stamp               Iteration#  Bytes Already 
Moved  Bytes Left To Move  Bytes Being Moved");
@@ -1568,7 +1462,10 @@ public class Balancer {
         = new ArrayList<NameNodeConnector>(namenodes.size());
     try {
       for (URI uri : namenodes) {
-        connectors.add(new NameNodeConnector(uri, conf));
+        final NameNodeConnector nnc = new NameNodeConnector(
+            Balancer.class.getSimpleName(), uri, BALANCER_ID_PATH, conf);
+        nnc.getKeyManager().startBlockKeyUpdater();
+        connectors.add(nnc);
       }
     
       boolean done = false;
@@ -1730,9 +1627,6 @@ public class Balancer {
     public int run(String[] args) {
       final long startTime = Time.now();
       final Configuration conf = getConf();
-      WIN_WIDTH = conf.getLong(
-          DFSConfigKeys.DFS_BALANCER_MOVEDWINWIDTH_KEY, 
-          DFSConfigKeys.DFS_BALANCER_MOVEDWINWIDTH_DEFAULT);
 
       try {
         checkReplicationPolicyCompatibility(conf);
@@ -1761,9 +1655,9 @@ public class Balancer {
       if (args != null) {
         try {
           for(int i = 0; i < args.length; i++) {
-            checkArgument(args.length >= 2, "args = " + Arrays.toString(args));
             if ("-threshold".equalsIgnoreCase(args[i])) {
-              i++;
+              checkArgument(++i < args.length,
+                "Threshold value is missing: args = " + Arrays.toString(args));
               try {
                 threshold = Double.parseDouble(args[i]);
                 if (threshold < 1 || threshold > 100) {
@@ -1778,7 +1672,8 @@ public class Balancer {
                 throw e;
               }
             } else if ("-policy".equalsIgnoreCase(args[i])) {
-              i++;
+              checkArgument(++i < args.length,
+                "Policy value is missing: args = " + Arrays.toString(args));
               try {
                 policy = BalancingPolicy.parse(args[i]);
               } catch(IllegalArgumentException e) {
@@ -1786,16 +1681,26 @@ public class Balancer {
                 throw e;
               }
             } else if ("-exclude".equalsIgnoreCase(args[i])) {
-              i++;
+              checkArgument(++i < args.length,
+                  "List of nodes to exclude | -f <filename> is missing: args = 
"
+                  + Arrays.toString(args));
               if ("-f".equalsIgnoreCase(args[i])) {
-                nodesTobeExcluded = Util.getHostListFromFile(args[++i]);
+                checkArgument(++i < args.length,
+                    "File containing nodes to exclude is not specified: args = 
"
+                    + Arrays.toString(args));
+                nodesTobeExcluded = Util.getHostListFromFile(args[i]);
               } else {
                 nodesTobeExcluded = Util.parseHostList(args[i]);
               }
             } else if ("-include".equalsIgnoreCase(args[i])) {
-              i++;
+              checkArgument(++i < args.length,
+                "List of nodes to include | -f <filename> is missing: args = "
+                + Arrays.toString(args));
               if ("-f".equalsIgnoreCase(args[i])) {
-                nodesTobeIncluded = Util.getHostListFromFile(args[++i]);
+                checkArgument(++i < args.length,
+                    "File containing nodes to include is not specified: args = 
"
+                    + Arrays.toString(args));
+                nodesTobeIncluded = Util.getHostListFromFile(args[i]);
                } else {
                 nodesTobeIncluded = Util.parseHostList(args[i]);
               }
@@ -1804,12 +1709,8 @@ public class Balancer {
                   + Arrays.toString(args));
             }
           }
-          if (!nodesTobeExcluded.isEmpty() && !nodesTobeIncluded.isEmpty()) {
-            System.err.println(
-                "-exclude and -include options cannot be specified together.");
-            throw new IllegalArgumentException(
-                "-exclude and -include options cannot be specified together.");
-          }
+          checkArgument(nodesTobeExcluded.isEmpty() || 
nodesTobeIncluded.isEmpty(),
+              "-exclude and -include options cannot be specified together.");
         } catch(RuntimeException e) {
           printUsage(System.err);
           throw e;

Modified: 
hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/NameNodeConnector.java
URL: 
http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/NameNodeConnector.java?rev=1616428&r1=1616427&r2=1616428&view=diff
==============================================================================
--- 
hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/NameNodeConnector.java
 (original)
+++ 
hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/NameNodeConnector.java
 Thu Aug  7 07:38:23 2014
@@ -17,113 +17,96 @@
  */
 package org.apache.hadoop.hdfs.server.balancer;
 
+import java.io.Closeable;
 import java.io.DataOutputStream;
 import java.io.IOException;
 import java.io.OutputStream;
 import java.net.InetAddress;
 import java.net.URI;
-import java.util.EnumSet;
 
 import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.FsServerDefaults;
 import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hdfs.DFSConfigKeys;
 import org.apache.hadoop.hdfs.NameNodeProxies;
 import org.apache.hadoop.hdfs.protocol.AlreadyBeingCreatedException;
 import org.apache.hadoop.hdfs.protocol.ClientProtocol;
-import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
-import 
org.apache.hadoop.hdfs.protocol.datatransfer.sasl.DataEncryptionKeyFactory;
-import org.apache.hadoop.hdfs.security.token.block.DataEncryptionKey;
-import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
-import org.apache.hadoop.hdfs.security.token.block.BlockTokenSecretManager;
-import org.apache.hadoop.hdfs.security.token.block.ExportedBlockKeys;
 import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocol;
 import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo;
 import org.apache.hadoop.io.IOUtils;
 import org.apache.hadoop.ipc.RemoteException;
-import org.apache.hadoop.security.token.Token;
-import org.apache.hadoop.util.Daemon;
 
 /**
- * The class provides utilities for {@link Balancer} to access a NameNode
+ * The class provides utilities for accessing a NameNode.
  */
 @InterfaceAudience.Private
-class NameNodeConnector implements DataEncryptionKeyFactory {
-  private static final Log LOG = Balancer.LOG;
-  private static final Path BALANCER_ID_PATH = new Path("/system/balancer.id");
+public class NameNodeConnector implements Closeable {
+  private static final Log LOG = LogFactory.getLog(NameNodeConnector.class);
+
   private static final int MAX_NOT_CHANGED_ITERATIONS = 5;
 
-  final URI nameNodeUri;
-  final String blockpoolID;
+  private final URI nameNodeUri;
+  private final String blockpoolID;
+
+  private final NamenodeProtocol namenode;
+  private final ClientProtocol client;
+  private final KeyManager keyManager;
+
+  private final FileSystem fs;
+  private final Path idPath;
+  private final OutputStream out;
 
-  final NamenodeProtocol namenode;
-  final ClientProtocol client;
-  final FileSystem fs;
-  final OutputStream out;
-
-  private final boolean isBlockTokenEnabled;
-  private final boolean encryptDataTransfer;
-  private boolean shouldRun;
-  private long keyUpdaterInterval;
-  // used for balancer
   private int notChangedIterations = 0;
-  private BlockTokenSecretManager blockTokenSecretManager;
-  private Daemon keyupdaterthread; // AccessKeyUpdater thread
-  private DataEncryptionKey encryptionKey;
 
-  NameNodeConnector(URI nameNodeUri,
+  public NameNodeConnector(String name, URI nameNodeUri, Path idPath,
       Configuration conf) throws IOException {
     this.nameNodeUri = nameNodeUri;
+    this.idPath = idPath;
     
-    this.namenode =
-      NameNodeProxies.createProxy(conf, nameNodeUri, NamenodeProtocol.class)
-        .getProxy();
-    this.client =
-      NameNodeProxies.createProxy(conf, nameNodeUri, ClientProtocol.class)
-        .getProxy();
+    this.namenode = NameNodeProxies.createProxy(conf, nameNodeUri,
+        NamenodeProtocol.class).getProxy();
+    this.client = NameNodeProxies.createProxy(conf, nameNodeUri,
+        ClientProtocol.class).getProxy();
     this.fs = FileSystem.get(nameNodeUri, conf);
 
     final NamespaceInfo namespaceinfo = namenode.versionRequest();
     this.blockpoolID = namespaceinfo.getBlockPoolID();
 
-    final ExportedBlockKeys keys = namenode.getBlockKeys();
-    this.isBlockTokenEnabled = keys.isBlockTokenEnabled();
-    if (isBlockTokenEnabled) {
-      long blockKeyUpdateInterval = keys.getKeyUpdateInterval();
-      long blockTokenLifetime = keys.getTokenLifetime();
-      LOG.info("Block token params received from NN: keyUpdateInterval="
-          + blockKeyUpdateInterval / (60 * 1000) + " min(s), tokenLifetime="
-          + blockTokenLifetime / (60 * 1000) + " min(s)");
-      String encryptionAlgorithm = conf.get(
-          DFSConfigKeys.DFS_DATA_ENCRYPTION_ALGORITHM_KEY);
-      this.blockTokenSecretManager = new BlockTokenSecretManager(
-          blockKeyUpdateInterval, blockTokenLifetime, blockpoolID,
-          encryptionAlgorithm);
-      this.blockTokenSecretManager.addKeys(keys);
-      /*
-       * Balancer should sync its block keys with NN more frequently than NN
-       * updates its block keys
-       */
-      this.keyUpdaterInterval = blockKeyUpdateInterval / 4;
-      LOG.info("Balancer will update its block keys every "
-          + keyUpdaterInterval / (60 * 1000) + " minute(s)");
-      this.keyupdaterthread = new Daemon(new BlockKeyUpdater());
-      this.shouldRun = true;
-      this.keyupdaterthread.start();
-    }
-    this.encryptDataTransfer = fs.getServerDefaults(new Path("/"))
-        .getEncryptDataTransfer();
-    // Check if there is another balancer running.
+    final FsServerDefaults defaults = fs.getServerDefaults(new Path("/"));
+    this.keyManager = new KeyManager(blockpoolID, namenode,
+        defaults.getEncryptDataTransfer(), conf);
     // Exit if there is another one running.
-    out = checkAndMarkRunningBalancer(); 
+    out = checkAndMarkRunning(); 
     if (out == null) {
-      throw new IOException("Another balancer is running");
+      throw new IOException("Another " + name + " is running.");
     }
   }
 
-  boolean shouldContinue(long dispatchBlockMoveBytes) {
+  /** @return the block pool ID */
+  public String getBlockpoolID() {
+    return blockpoolID;
+  }
+
+  /** @return the namenode proxy. */
+  public NamenodeProtocol getNamenode() {
+    return namenode;
+  }
+
+  /** @return the client proxy. */
+  public ClientProtocol getClient() {
+    return client;
+  }
+
+  /** @return the key manager */
+  public KeyManager getKeyManager() {
+    return keyManager;
+  }
+
+  /** Should the instance continue running? */
+  public boolean shouldContinue(long dispatchBlockMoveBytes) {
     if (dispatchBlockMoveBytes > 0) {
       notChangedIterations = 0;
     } else {
@@ -137,53 +120,25 @@ class NameNodeConnector implements DataE
     return true;
   }
   
-  /** Get an access token for a block. */
-  Token<BlockTokenIdentifier> getAccessToken(ExtendedBlock eb
-      ) throws IOException {
-    if (!isBlockTokenEnabled) {
-      return BlockTokenSecretManager.DUMMY_TOKEN;
-    } else {
-      if (!shouldRun) {
-        throw new IOException(
-            "Can not get access token. BlockKeyUpdater is not running");
-      }
-      return blockTokenSecretManager.generateToken(null, eb,
-          EnumSet.of(BlockTokenSecretManager.AccessMode.REPLACE,
-          BlockTokenSecretManager.AccessMode.COPY));
-    }
-  }
-
-  @Override
-  public DataEncryptionKey newDataEncryptionKey() {
-    if (encryptDataTransfer) {
-      synchronized (this) {
-        if (encryptionKey == null) {
-          encryptionKey = blockTokenSecretManager.generateDataEncryptionKey();
-        }
-        return encryptionKey;
-      }
-    } else {
-      return null;
-    }
-  }
 
-  /* The idea for making sure that there is no more than one balancer
+  /**
+   * The idea for making sure that there is no more than one instance
    * running in an HDFS is to create a file in the HDFS, writes the hostname
-   * of the machine on which the balancer is running to the file, but did not
-   * close the file until the balancer exits. 
-   * This prevents the second balancer from running because it can not
+   * of the machine on which the instance is running to the file, but did not
+   * close the file until it exits. 
+   * 
+   * This prevents the second instance from running because it can not
    * creates the file while the first one is running.
    * 
-   * This method checks if there is any running balancer and 
-   * if no, mark yes if no.
+   * This method checks if there is any running instance. If no, mark yes.
    * Note that this is an atomic operation.
    * 
-   * Return null if there is a running balancer; otherwise the output stream
-   * to the newly created file.
+   * @return null if there is a running instance;
+   *         otherwise, the output stream to the newly created file.
    */
-  private OutputStream checkAndMarkRunningBalancer() throws IOException {
+  private OutputStream checkAndMarkRunning() throws IOException {
     try {
-      final DataOutputStream out = fs.create(BALANCER_ID_PATH);
+      final DataOutputStream out = fs.create(idPath);
       out.writeBytes(InetAddress.getLocalHost().getHostName());
       out.flush();
       return out;
@@ -196,24 +151,17 @@ class NameNodeConnector implements DataE
     }
   }
 
-  /** Close the connection. */
-  void close() {
-    shouldRun = false;
-    try {
-      if (keyupdaterthread != null) {
-        keyupdaterthread.interrupt();
-      }
-    } catch(Exception e) {
-      LOG.warn("Exception shutting down access key updater thread", e);
-    }
+  @Override
+  public void close() {
+    keyManager.close();
 
     // close the output file
     IOUtils.closeStream(out); 
     if (fs != null) {
       try {
-        fs.delete(BALANCER_ID_PATH, true);
+        fs.delete(idPath, true);
       } catch(IOException ioe) {
-        LOG.warn("Failed to delete " + BALANCER_ID_PATH, ioe);
+        LOG.warn("Failed to delete " + idPath, ioe);
       }
     }
   }
@@ -221,31 +169,6 @@ class NameNodeConnector implements DataE
   @Override
   public String toString() {
     return getClass().getSimpleName() + "[namenodeUri=" + nameNodeUri
-        + ", id=" + blockpoolID
-        + "]";
-  }
-
-  /**
-   * Periodically updates access keys.
-   */
-  class BlockKeyUpdater implements Runnable {
-    @Override
-    public void run() {
-      try {
-        while (shouldRun) {
-          try {
-            blockTokenSecretManager.addKeys(namenode.getBlockKeys());
-          } catch (IOException e) {
-            LOG.error("Failed to set keys", e);
-          }
-          Thread.sleep(keyUpdaterInterval);
-        }
-      } catch (InterruptedException e) {
-        LOG.debug("InterruptedException in block key updater thread", e);
-      } catch (Throwable e) {
-        LOG.error("Exception in block key updater thread", e);
-        shouldRun = false;
-      }
-    }
+        + ", bpid=" + blockpoolID + "]";
   }
 }

Modified: 
hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockInfo.java
URL: 
http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockInfo.java?rev=1616428&r1=1616427&r2=1616428&view=diff
==============================================================================
--- 
hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockInfo.java
 (original)
+++ 
hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockInfo.java
 Thu Aug  7 07:38:23 2014
@@ -21,7 +21,6 @@ import java.util.LinkedList;
 
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.hdfs.protocol.Block;
-import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
 import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.BlockUCState;
 import org.apache.hadoop.util.LightWeightGSet;
 
@@ -254,18 +253,18 @@ public class BlockInfo extends Block imp
   }
   /**
    * Find specified DatanodeStorageInfo.
-   * @return index or -1 if not found.
+   * @return DatanodeStorageInfo or null if not found.
    */
-  int findStorageInfo(DatanodeInfo dn) {
+  DatanodeStorageInfo findStorageInfo(DatanodeDescriptor dn) {
     int len = getCapacity();
     for(int idx = 0; idx < len; idx++) {
       DatanodeStorageInfo cur = getStorageInfo(idx);
       if(cur == null)
         break;
       if(cur.getDatanodeDescriptor() == dn)
-        return idx;
+        return cur;
     }
-    return -1;
+    return null;
   }
   
   /**

Modified: 
hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java
URL: 
http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java?rev=1616428&r1=1616427&r2=1616428&view=diff
==============================================================================
--- 
hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java
 (original)
+++ 
hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java
 Thu Aug  7 07:38:23 2014
@@ -1082,6 +1082,7 @@ public class BlockManager {
    * Mark the block belonging to datanode as corrupt
    * @param blk Block to be marked as corrupt
    * @param dn Datanode which holds the corrupt replica
+   * @param storageID if known, null otherwise.
    * @param reason a textual reason why the block should be marked corrupt,
    * for logging purposes
    */
@@ -1098,19 +1099,29 @@ public class BlockManager {
           + blk + " not found");
       return;
     }
-    markBlockAsCorrupt(new BlockToMarkCorrupt(storedBlock,
-        blk.getGenerationStamp(), reason, Reason.CORRUPTION_REPORTED),
-        dn, storageID);
-  }
 
-  private void markBlockAsCorrupt(BlockToMarkCorrupt b,
-      DatanodeInfo dn, String storageID) throws IOException {
     DatanodeDescriptor node = getDatanodeManager().getDatanode(dn);
     if (node == null) {
-      throw new IOException("Cannot mark " + b
+      throw new IOException("Cannot mark " + blk
           + " as corrupt because datanode " + dn + " (" + dn.getDatanodeUuid()
           + ") does not exist");
     }
+    
+    markBlockAsCorrupt(new BlockToMarkCorrupt(storedBlock,
+            blk.getGenerationStamp(), reason, Reason.CORRUPTION_REPORTED),
+        storageID == null ? null : node.getStorageInfo(storageID),
+        node);
+  }
+
+  /**
+   * 
+   * @param b
+   * @param storageInfo storage that contains the block, if known. null 
otherwise.
+   * @throws IOException
+   */
+  private void markBlockAsCorrupt(BlockToMarkCorrupt b,
+      DatanodeStorageInfo storageInfo,
+      DatanodeDescriptor node) throws IOException {
 
     BlockCollection bc = b.corrupted.getBlockCollection();
     if (bc == null) {
@@ -1121,7 +1132,9 @@ public class BlockManager {
     } 
 
     // Add replica to the data-node if it is not already there
-    node.addBlock(storageID, b.stored);
+    if (storageInfo != null) {
+      storageInfo.addBlock(b.stored);
+    }
 
     // Add this replica to corruptReplicas Map
     corruptReplicas.addToCorruptReplicasMap(b.corrupted, node, b.reason,
@@ -1481,6 +1494,8 @@ public class BlockManager {
    * 
    * @throws IOException
    *           if the number of targets < minimum replication.
+   * @see BlockPlacementPolicy#chooseTarget(String, int, Node,
+   *      List, boolean, Set, long, StorageType)
    */
   public DatanodeStorageInfo[] chooseTarget4NewBlock(final String src,
       final int numOfReplicas, final DatanodeDescriptor client,
@@ -1719,7 +1734,7 @@ public class BlockManager {
    * @throws IOException
    */
   public boolean processReport(final DatanodeID nodeID,
-      final DatanodeStorage storage, final String poolId,
+      final DatanodeStorage storage,
       final BlockListAsLongs newReport) throws IOException {
     namesystem.writeLock();
     final long startTime = Time.now(); //after acquiring write lock
@@ -1751,9 +1766,9 @@ public class BlockManager {
       if (storageInfo.numBlocks() == 0) {
         // The first block report can be processed a lot more efficiently than
         // ordinary block reports.  This shortens restart times.
-        processFirstBlockReport(node, storage.getStorageID(), newReport);
+        processFirstBlockReport(storageInfo, newReport);
       } else {
-        processReport(node, storage, newReport);
+        processReport(storageInfo, newReport);
       }
       
       // Now that we have an up-to-date block report, we know that any
@@ -1815,9 +1830,8 @@ public class BlockManager {
     }
   }
   
-  private void processReport(final DatanodeDescriptor node,
-      final DatanodeStorage storage,
-      final BlockListAsLongs report) throws IOException {
+  private void processReport(final DatanodeStorageInfo storageInfo,
+                             final BlockListAsLongs report) throws IOException 
{
     // Normal case:
     // Modify the (block-->datanode) map, according to the difference
     // between the old and new block report.
@@ -1827,19 +1841,20 @@ public class BlockManager {
     Collection<Block> toInvalidate = new LinkedList<Block>();
     Collection<BlockToMarkCorrupt> toCorrupt = new 
LinkedList<BlockToMarkCorrupt>();
     Collection<StatefulBlockInfo> toUC = new LinkedList<StatefulBlockInfo>();
-    reportDiff(node, storage, report,
+    reportDiff(storageInfo, report,
         toAdd, toRemove, toInvalidate, toCorrupt, toUC);
-
+   
+    DatanodeDescriptor node = storageInfo.getDatanodeDescriptor();
     // Process the blocks on each queue
     for (StatefulBlockInfo b : toUC) { 
-      addStoredBlockUnderConstruction(b, node, storage.getStorageID());
+      addStoredBlockUnderConstruction(b, storageInfo);
     }
     for (Block b : toRemove) {
       removeStoredBlock(b, node);
     }
     int numBlocksLogged = 0;
     for (BlockInfo b : toAdd) {
-      addStoredBlock(b, node, storage.getStorageID(), null, numBlocksLogged < 
maxNumBlocksToLog);
+      addStoredBlock(b, storageInfo, null, numBlocksLogged < 
maxNumBlocksToLog);
       numBlocksLogged++;
     }
     if (numBlocksLogged > maxNumBlocksToLog) {
@@ -1853,7 +1868,7 @@ public class BlockManager {
       addToInvalidates(b, node);
     }
     for (BlockToMarkCorrupt b : toCorrupt) {
-      markBlockAsCorrupt(b, node, storage.getStorageID());
+      markBlockAsCorrupt(b, storageInfo, node);
     }
   }
 
@@ -1864,16 +1879,16 @@ public class BlockManager {
    * a toRemove list (since there won't be any).  It also silently discards 
    * any invalid blocks, thereby deferring their processing until 
    * the next block report.
-   * @param node - DatanodeDescriptor of the node that sent the report
+   * @param storageInfo - DatanodeStorageInfo that sent the report
    * @param report - the initial block report, to be processed
    * @throws IOException 
    */
-  private void processFirstBlockReport(final DatanodeDescriptor node,
-      final String storageID,
+  private void processFirstBlockReport(
+      final DatanodeStorageInfo storageInfo,
       final BlockListAsLongs report) throws IOException {
     if (report == null) return;
     assert (namesystem.hasWriteLock());
-    assert (node.getStorageInfo(storageID).numBlocks() == 0);
+    assert (storageInfo.numBlocks() == 0);
     BlockReportIterator itBR = report.getBlockReportIterator();
 
     while(itBR.hasNext()) {
@@ -1882,7 +1897,7 @@ public class BlockManager {
       
       if (shouldPostponeBlocksFromFuture &&
           namesystem.isGenStampInFuture(iblk)) {
-        queueReportedBlock(node, storageID, iblk, reportedState,
+        queueReportedBlock(storageInfo, iblk, reportedState,
             QUEUE_REASON_FUTURE_GENSTAMP);
         continue;
       }
@@ -1894,15 +1909,16 @@ public class BlockManager {
       // If block is corrupt, mark it and continue to next block.
       BlockUCState ucState = storedBlock.getBlockUCState();
       BlockToMarkCorrupt c = checkReplicaCorrupt(
-          iblk, reportedState, storedBlock, ucState, node);
+          iblk, reportedState, storedBlock, ucState,
+          storageInfo.getDatanodeDescriptor());
       if (c != null) {
         if (shouldPostponeBlocksFromFuture) {
           // In the Standby, we may receive a block report for a file that we
           // just have an out-of-date gen-stamp or state for, for example.
-          queueReportedBlock(node, storageID, iblk, reportedState,
+          queueReportedBlock(storageInfo, iblk, reportedState,
               QUEUE_REASON_CORRUPT_STATE);
         } else {
-          markBlockAsCorrupt(c, node, storageID);
+          markBlockAsCorrupt(c, storageInfo, 
storageInfo.getDatanodeDescriptor());
         }
         continue;
       }
@@ -1910,7 +1926,7 @@ public class BlockManager {
       // If block is under construction, add this replica to its list
       if (isBlockUnderConstruction(storedBlock, ucState, reportedState)) {
         ((BlockInfoUnderConstruction)storedBlock).addReplicaIfNotPresent(
-            node.getStorageInfo(storageID), iblk, reportedState);
+            storageInfo, iblk, reportedState);
         // OpenFileBlocks only inside snapshots also will be added to safemode
         // threshold. So we need to update such blocks to safemode
         // refer HDFS-5283
@@ -1923,12 +1939,12 @@ public class BlockManager {
       }      
       //add replica if appropriate
       if (reportedState == ReplicaState.FINALIZED) {
-        addStoredBlockImmediate(storedBlock, node, storageID);
+        addStoredBlockImmediate(storedBlock, storageInfo);
       }
     }
   }
 
-  private void reportDiff(DatanodeDescriptor dn, DatanodeStorage storage, 
+  private void reportDiff(DatanodeStorageInfo storageInfo, 
       BlockListAsLongs newReport, 
       Collection<BlockInfo> toAdd,              // add to DatanodeDescriptor
       Collection<Block> toRemove,           // remove from DatanodeDescriptor
@@ -1936,8 +1952,6 @@ public class BlockManager {
       Collection<BlockToMarkCorrupt> toCorrupt, // add to corrupt replicas list
       Collection<StatefulBlockInfo> toUC) { // add to under-construction list
 
-    final DatanodeStorageInfo storageInfo = 
dn.getStorageInfo(storage.getStorageID());
-
     // place a delimiter in the list which separates blocks 
     // that have been reported from those that have not
     BlockInfo delimiter = new BlockInfo(new Block(), 1);
@@ -1954,7 +1968,7 @@ public class BlockManager {
     while(itBR.hasNext()) {
       Block iblk = itBR.next();
       ReplicaState iState = itBR.getCurrentReplicaState();
-      BlockInfo storedBlock = processReportedBlock(dn, storage.getStorageID(),
+      BlockInfo storedBlock = processReportedBlock(storageInfo,
           iblk, iState, toAdd, toInvalidate, toCorrupt, toUC);
 
       // move block to the head of the list
@@ -1991,7 +2005,7 @@ public class BlockManager {
    * BlockInfoUnderConstruction's list of replicas.</li>
    * </ol>
    * 
-   * @param dn descriptor for the datanode that made the report
+   * @param storageInfo DatanodeStorageInfo that sent the report.
    * @param block reported block replica
    * @param reportedState reported replica state
    * @param toAdd add to DatanodeDescriptor
@@ -2003,14 +2017,16 @@ public class BlockManager {
    * @return the up-to-date stored block, if it should be kept.
    *         Otherwise, null.
    */
-  private BlockInfo processReportedBlock(final DatanodeDescriptor dn,
-      final String storageID,
+  private BlockInfo processReportedBlock(
+      final DatanodeStorageInfo storageInfo,
       final Block block, final ReplicaState reportedState, 
       final Collection<BlockInfo> toAdd, 
       final Collection<Block> toInvalidate, 
       final Collection<BlockToMarkCorrupt> toCorrupt,
       final Collection<StatefulBlockInfo> toUC) {
     
+    DatanodeDescriptor dn = storageInfo.getDatanodeDescriptor();
+
     if(LOG.isDebugEnabled()) {
       LOG.debug("Reported block " + block
           + " on " + dn + " size " + block.getNumBytes()
@@ -2019,7 +2035,7 @@ public class BlockManager {
   
     if (shouldPostponeBlocksFromFuture &&
         namesystem.isGenStampInFuture(block)) {
-      queueReportedBlock(dn, storageID, block, reportedState,
+      queueReportedBlock(storageInfo, block, reportedState,
           QUEUE_REASON_FUTURE_GENSTAMP);
       return null;
     }
@@ -2059,7 +2075,7 @@ public class BlockManager {
         // TODO: Pretty confident this should be s/storedBlock/block below,
         // since we should be postponing the info of the reported block, not
         // the stored block. See HDFS-6289 for more context.
-        queueReportedBlock(dn, storageID, storedBlock, reportedState,
+        queueReportedBlock(storageInfo, storedBlock, reportedState,
             QUEUE_REASON_CORRUPT_STATE);
       } else {
         toCorrupt.add(c);
@@ -2088,17 +2104,17 @@ public class BlockManager {
    * standby node. @see PendingDataNodeMessages.
    * @param reason a textual reason to report in the debug logs
    */
-  private void queueReportedBlock(DatanodeDescriptor dn, String storageID, 
Block block,
+  private void queueReportedBlock(DatanodeStorageInfo storageInfo, Block block,
       ReplicaState reportedState, String reason) {
     assert shouldPostponeBlocksFromFuture;
     
     if (LOG.isDebugEnabled()) {
       LOG.debug("Queueing reported block " + block +
           " in state " + reportedState + 
-          " from datanode " + dn + " for later processing " +
-          "because " + reason + ".");
+          " from datanode " + storageInfo.getDatanodeDescriptor() +
+          " for later processing because " + reason + ".");
     }
-    pendingDNMessages.enqueueReportedBlock(dn, storageID, block, 
reportedState);
+    pendingDNMessages.enqueueReportedBlock(storageInfo, block, reportedState);
   }
 
   /**
@@ -2121,7 +2137,7 @@ public class BlockManager {
       if (LOG.isDebugEnabled()) {
         LOG.debug("Processing previouly queued message " + rbi);
       }
-      processAndHandleReportedBlock(rbi.getNode(), rbi.getStorageID(), 
+      processAndHandleReportedBlock(rbi.getStorageInfo(), 
           rbi.getBlock(), rbi.getReportedState(), null);
     }
   }
@@ -2178,6 +2194,16 @@ public class BlockManager {
         } else {
           return null; // not corrupt
         }
+      case UNDER_CONSTRUCTION:
+        if (storedBlock.getGenerationStamp() > reported.getGenerationStamp()) {
+          final long reportedGS = reported.getGenerationStamp();
+          return new BlockToMarkCorrupt(storedBlock, reportedGS, "block is "
+              + ucState + " and reported state " + reportedState
+              + ", But reported genstamp " + reportedGS
+              + " does not match genstamp in block map "
+              + storedBlock.getGenerationStamp(), Reason.GENSTAMP_MISMATCH);
+        }
+        return null;
       default:
         return null;
       }
@@ -2241,19 +2267,20 @@ public class BlockManager {
   }
 
   void addStoredBlockUnderConstruction(StatefulBlockInfo ucBlock,
-      DatanodeDescriptor node, String storageID) throws IOException {
+      DatanodeStorageInfo storageInfo) throws IOException {
     BlockInfoUnderConstruction block = ucBlock.storedBlock;
-    block.addReplicaIfNotPresent(node.getStorageInfo(storageID),
-        ucBlock.reportedBlock, ucBlock.reportedState);
+    block.addReplicaIfNotPresent(
+        storageInfo, ucBlock.reportedBlock, ucBlock.reportedState);
 
-    if (ucBlock.reportedState == ReplicaState.FINALIZED && 
block.findDatanode(node) < 0) {
-      addStoredBlock(block, node, storageID, null, true);
+    if (ucBlock.reportedState == ReplicaState.FINALIZED &&
+        block.findDatanode(storageInfo.getDatanodeDescriptor()) < 0) {
+      addStoredBlock(block, storageInfo, null, true);
     }
   } 
 
   /**
    * Faster version of
-   * {@link #addStoredBlock(BlockInfo, DatanodeDescriptor, String, 
DatanodeDescriptor, boolean)}
+   * {@link #addStoredBlock(BlockInfo, DatanodeStorageInfo, 
DatanodeDescriptor, boolean)}
    * , intended for use with initial block report at startup. If not in startup
    * safe mode, will call standard addStoredBlock(). Assumes this method is
    * called "immediately" so there is no need to refresh the storedBlock from
@@ -2264,17 +2291,17 @@ public class BlockManager {
    * @throws IOException
    */
   private void addStoredBlockImmediate(BlockInfo storedBlock,
-      DatanodeDescriptor node, String storageID)
+      DatanodeStorageInfo storageInfo)
   throws IOException {
     assert (storedBlock != null && namesystem.hasWriteLock());
     if (!namesystem.isInStartupSafeMode() 
         || namesystem.isPopulatingReplQueues()) {
-      addStoredBlock(storedBlock, node, storageID, null, false);
+      addStoredBlock(storedBlock, storageInfo, null, false);
       return;
     }
 
     // just add it
-    node.addBlock(storageID, storedBlock);
+    storageInfo.addBlock(storedBlock);
 
     // Now check for completion of blocks and safe block count
     int numCurrentReplica = countLiveNodes(storedBlock);
@@ -2296,13 +2323,13 @@ public class BlockManager {
    * @return the block that is stored in blockMap.
    */
   private Block addStoredBlock(final BlockInfo block,
-                               DatanodeDescriptor node,
-                               String storageID,
+                               DatanodeStorageInfo storageInfo,
                                DatanodeDescriptor delNodeHint,
                                boolean logEveryBlock)
   throws IOException {
     assert block != null && namesystem.hasWriteLock();
     BlockInfo storedBlock;
+    DatanodeDescriptor node = storageInfo.getDatanodeDescriptor();
     if (block instanceof BlockInfoUnderConstruction) {
       //refresh our copy in case the block got completed in another thread
       storedBlock = blocksMap.getStoredBlock(block);
@@ -2322,7 +2349,7 @@ public class BlockManager {
     assert bc != null : "Block must belong to a file";
 
     // add block to the datanode
-    boolean added = node.addBlock(storageID, storedBlock);
+    boolean added = storageInfo.addBlock(storedBlock);
 
     int curReplicaDelta;
     if (added) {
@@ -2872,8 +2899,9 @@ public class BlockManager {
    * The given node is reporting that it received a certain block.
    */
   @VisibleForTesting
-  void addBlock(DatanodeDescriptor node, String storageID, Block block, String 
delHint)
+  void addBlock(DatanodeStorageInfo storageInfo, Block block, String delHint)
       throws IOException {
+    DatanodeDescriptor node = storageInfo.getDatanodeDescriptor();
     // Decrement number of blocks scheduled to this datanode.
     // for a retry request (of DatanodeProtocol#blockReceivedAndDeleted with 
     // RECEIVED_BLOCK), we currently also decrease the approximate number. 
@@ -2893,12 +2921,12 @@ public class BlockManager {
     // Modify the blocks->datanode map and node's map.
     //
     pendingReplications.decrement(block, node);
-    processAndHandleReportedBlock(node, storageID, block, 
ReplicaState.FINALIZED,
+    processAndHandleReportedBlock(storageInfo, block, ReplicaState.FINALIZED,
         delHintNode);
   }
   
-  private void processAndHandleReportedBlock(DatanodeDescriptor node,
-      String storageID, Block block,
+  private void processAndHandleReportedBlock(
+      DatanodeStorageInfo storageInfo, Block block,
       ReplicaState reportedState, DatanodeDescriptor delHintNode)
       throws IOException {
     // blockReceived reports a finalized block
@@ -2906,7 +2934,9 @@ public class BlockManager {
     Collection<Block> toInvalidate = new LinkedList<Block>();
     Collection<BlockToMarkCorrupt> toCorrupt = new 
LinkedList<BlockToMarkCorrupt>();
     Collection<StatefulBlockInfo> toUC = new LinkedList<StatefulBlockInfo>();
-    processReportedBlock(node, storageID, block, reportedState,
+    final DatanodeDescriptor node = storageInfo.getDatanodeDescriptor();
+
+    processReportedBlock(storageInfo, block, reportedState,
                               toAdd, toInvalidate, toCorrupt, toUC);
     // the block is only in one of the to-do lists
     // if it is in none then data-node already has it
@@ -2914,11 +2944,11 @@ public class BlockManager {
       : "The block should be only in one of the lists.";
 
     for (StatefulBlockInfo b : toUC) { 
-      addStoredBlockUnderConstruction(b, node, storageID);
+      addStoredBlockUnderConstruction(b, storageInfo);
     }
     long numBlocksLogged = 0;
     for (BlockInfo b : toAdd) {
-      addStoredBlock(b, node, storageID, delHintNode, numBlocksLogged < 
maxNumBlocksToLog);
+      addStoredBlock(b, storageInfo, delHintNode, numBlocksLogged < 
maxNumBlocksToLog);
       numBlocksLogged++;
     }
     if (numBlocksLogged > maxNumBlocksToLog) {
@@ -2932,7 +2962,7 @@ public class BlockManager {
       addToInvalidates(b, node);
     }
     for (BlockToMarkCorrupt b : toCorrupt) {
-      markBlockAsCorrupt(b, node, storageID);
+      markBlockAsCorrupt(b, storageInfo, node);
     }
   }
 
@@ -2959,13 +2989,15 @@ public class BlockManager {
           "Got incremental block report from unregistered or dead node");
     }
 
-    if (node.getStorageInfo(srdb.getStorage().getStorageID()) == null) {
+    DatanodeStorageInfo storageInfo =
+        node.getStorageInfo(srdb.getStorage().getStorageID());
+    if (storageInfo == null) {
       // The DataNode is reporting an unknown storage. Usually the NN learns
       // about new storages from heartbeats but during NN restart we may
       // receive a block report or incremental report before the heartbeat.
       // We must handle this for protocol compatibility. This issue was
       // uncovered by HDFS-6094.
-      node.updateStorage(srdb.getStorage());
+      storageInfo = node.updateStorage(srdb.getStorage());
     }
 
     for (ReceivedDeletedBlockInfo rdbi : srdb.getBlocks()) {
@@ -2975,14 +3007,13 @@ public class BlockManager {
         deleted++;
         break;
       case RECEIVED_BLOCK:
-        addBlock(node, srdb.getStorage().getStorageID(),
-            rdbi.getBlock(), rdbi.getDelHints());
+        addBlock(storageInfo, rdbi.getBlock(), rdbi.getDelHints());
         received++;
         break;
       case RECEIVING_BLOCK:
         receiving++;
-        processAndHandleReportedBlock(node, srdb.getStorage().getStorageID(),
-            rdbi.getBlock(), ReplicaState.RBW, null);
+        processAndHandleReportedBlock(storageInfo, rdbi.getBlock(),
+                                      ReplicaState.RBW, null);
         break;
       default:
         String msg = 
@@ -3174,6 +3205,15 @@ public class BlockManager {
         }
       }
     }
+
+    if (!status && !srcNode.isAlive) {
+      LOG.warn("srcNode " + srcNode + " is dead " +
+          "when decommission is in progress. Continue to mark " +
+          "it as decommission in progress. In that way, when it rejoins the " +
+          "cluster it can continue the decommission process.");
+      status = true;
+    }
+
     srcNode.decommissioningStatus.set(underReplicatedBlocks,
         decommissionOnlyReplicas, 
         underReplicatedInOpenFiles);

Modified: 
hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlocksMap.java
URL: 
http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlocksMap.java?rev=1616428&r1=1616427&r2=1616428&view=diff
==============================================================================
--- 
hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlocksMap.java
 (original)
+++ 
hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlocksMap.java
 Thu Aug  7 07:38:23 2014
@@ -23,8 +23,8 @@ import org.apache.hadoop.hdfs.protocol.B
 import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage;
 import org.apache.hadoop.util.GSet;
 import org.apache.hadoop.util.LightWeightGSet;
-import org.apache.hadoop.util.LightWeightGSet.SetIterator;
 
+import com.google.common.base.Preconditions;
 import com.google.common.base.Predicate;
 import com.google.common.collect.Iterables;
 
@@ -217,9 +217,14 @@ class BlocksMap {
     BlockInfo currentBlock = blocks.get(newBlock);
     assert currentBlock != null : "the block if not in blocksMap";
     // replace block in data-node lists
-    for(int idx = currentBlock.numNodes()-1; idx >= 0; idx--) {
-      DatanodeDescriptor dn = currentBlock.getDatanode(idx);
-      dn.replaceBlock(currentBlock, newBlock);
+    for (int i = currentBlock.numNodes() - 1; i >= 0; i--) {
+      final DatanodeDescriptor dn = currentBlock.getDatanode(i);
+      final DatanodeStorageInfo storage = currentBlock.findStorageInfo(dn);
+      final boolean removed = storage.removeBlock(currentBlock);
+      Preconditions.checkState(removed, "currentBlock not found.");
+
+      final boolean added = storage.addBlock(newBlock);
+      Preconditions.checkState(added, "newBlock already exists.");
     }
     // replace block in the map itself
     blocks.put(newBlock);

Modified: 
hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/CorruptReplicasMap.java
URL: 
http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/CorruptReplicasMap.java?rev=1616428&r1=1616427&r2=1616428&view=diff
==============================================================================
--- 
hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/CorruptReplicasMap.java
 (original)
+++ 
hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/CorruptReplicasMap.java
 Thu Aug  7 07:38:23 2014
@@ -48,18 +48,6 @@ public class CorruptReplicasMap{
 
   private final SortedMap<Block, Map<DatanodeDescriptor, Reason>> 
corruptReplicasMap =
     new TreeMap<Block, Map<DatanodeDescriptor, Reason>>();
-  
-  /**
-   * Mark the block belonging to datanode as corrupt.
-   *
-   * @param blk Block to be added to CorruptReplicasMap
-   * @param dn DatanodeDescriptor which holds the corrupt replica
-   * @param reason a textual reason (for logging purposes)
-   */
-  public void addToCorruptReplicasMap(Block blk, DatanodeDescriptor dn,
-      String reason) {
-    addToCorruptReplicasMap(blk, dn, reason, Reason.NONE);
-  }
 
   /**
    * Mark the block belonging to datanode as corrupt.
@@ -69,7 +57,7 @@ public class CorruptReplicasMap{
    * @param reason a textual reason (for logging purposes)
    * @param reasonCode the enum representation of the reason
    */
-  public void addToCorruptReplicasMap(Block blk, DatanodeDescriptor dn,
+  void addToCorruptReplicasMap(Block blk, DatanodeDescriptor dn,
       String reason, Reason reasonCode) {
     Map <DatanodeDescriptor, Reason> nodes = corruptReplicasMap.get(blk);
     if (nodes == null) {
@@ -127,7 +115,6 @@ public class CorruptReplicasMap{
   boolean removeFromCorruptReplicasMap(Block blk, DatanodeDescriptor datanode,
       Reason reason) {
     Map <DatanodeDescriptor, Reason> datanodes = corruptReplicasMap.get(blk);
-    boolean removed = false;
     if (datanodes==null)
       return false;
 
@@ -174,12 +161,12 @@ public class CorruptReplicasMap{
     return ((nodes != null) && (nodes.contains(node)));
   }
 
-  public int numCorruptReplicas(Block blk) {
+  int numCorruptReplicas(Block blk) {
     Collection<DatanodeDescriptor> nodes = getNodes(blk);
     return (nodes == null) ? 0 : nodes.size();
   }
   
-  public int size() {
+  int size() {
     return corruptReplicasMap.size();
   }
 

Modified: 
hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeDescriptor.java
URL: 
http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeDescriptor.java?rev=1616428&r1=1616427&r2=1616428&view=diff
==============================================================================
--- 
hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeDescriptor.java
 (original)
+++ 
hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeDescriptor.java
 Thu Aug  7 07:38:23 2014
@@ -234,18 +234,6 @@ public class DatanodeDescriptor extends 
     updateHeartbeat(StorageReport.EMPTY_ARRAY, 0L, 0L, 0, 0);
   }
 
-  /**
-   * Add data-node to the block. Add block to the head of the list of blocks
-   * belonging to the data-node.
-   */
-  public boolean addBlock(String storageID, BlockInfo b) {
-    DatanodeStorageInfo s = getStorageInfo(storageID);
-    if (s != null) {
-      return s.addBlock(b);
-    }
-    return false;
-  }
-
   @VisibleForTesting
   public DatanodeStorageInfo getStorageInfo(String storageID) {
     synchronized (storageMap) {
@@ -260,8 +248,8 @@ public class DatanodeDescriptor extends 
   }
 
   public StorageReport[] getStorageReports() {
-    final StorageReport[] reports = new StorageReport[storageMap.size()];
     final DatanodeStorageInfo[] infos = getStorageInfos();
+    final StorageReport[] reports = new StorageReport[infos.length];
     for(int i = 0; i < infos.length; i++) {
       reports[i] = infos[i].toStorageReport();
     }
@@ -284,13 +272,10 @@ public class DatanodeDescriptor extends 
    * data-node from the block.
    */
   boolean removeBlock(BlockInfo b) {
-    int index = b.findStorageInfo(this);
+    final DatanodeStorageInfo s = b.findStorageInfo(this);
     // if block exists on this datanode
-    if (index >= 0) {
-      DatanodeStorageInfo s = b.getStorageInfo(index);
-      if (s != null) {
-        return s.removeBlock(b);
-      }
+    if (s != null) {
+      return s.removeBlock(b);
     }
     return false;
   }
@@ -307,24 +292,6 @@ public class DatanodeDescriptor extends 
     return false;
   }
 
-  /**
-   * Replace specified old block with a new one in the DataNodeDescriptor.
-   *
-   * @param oldBlock - block to be replaced
-   * @param newBlock - a replacement block
-   * @return the new block
-   */
-  public BlockInfo replaceBlock(BlockInfo oldBlock, BlockInfo newBlock) {
-    int index = oldBlock.findStorageInfo(this);
-    DatanodeStorageInfo s = oldBlock.getStorageInfo(index);
-    boolean done = s.removeBlock(oldBlock);
-    assert done : "Old block should belong to the data-node when replacing";
-
-    done = s.addBlock(newBlock);
-    assert done : "New block should not belong to the data-node when 
replacing";
-    return newBlock;
-  }
-
   public void resetBlocks() {
     setCapacity(0);
     setRemaining(0);

Modified: 
hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeStorageInfo.java
URL: 
http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeStorageInfo.java?rev=1616428&r1=1616427&r2=1616428&view=diff
==============================================================================
--- 
hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeStorageInfo.java
 (original)
+++ 
hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeStorageInfo.java
 Thu Aug  7 07:38:23 2014
@@ -207,7 +207,7 @@ public class DatanodeStorageInfo {
     return blockPoolUsed;
   }
 
-  boolean addBlock(BlockInfo b) {
+  public boolean addBlock(BlockInfo b) {
     if(!b.addStorage(this))
       return false;
     // add to the head of the data-node list

Modified: 
hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/PendingDataNodeMessages.java
URL: 
http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/PendingDataNodeMessages.java?rev=1616428&r1=1616427&r2=1616428&view=diff
==============================================================================
--- 
hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/PendingDataNodeMessages.java
 (original)
+++ 
hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/PendingDataNodeMessages.java
 Thu Aug  7 07:38:23 2014
@@ -23,6 +23,7 @@ import java.util.Queue;
 
 import org.apache.hadoop.hdfs.protocol.Block;
 import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.ReplicaState;
+
 import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
 
@@ -41,14 +42,12 @@ class PendingDataNodeMessages {
     
   static class ReportedBlockInfo {
     private final Block block;
-    private final DatanodeDescriptor dn;
-    private final String storageID;
+    private final DatanodeStorageInfo storageInfo;
     private final ReplicaState reportedState;
 
-    ReportedBlockInfo(DatanodeDescriptor dn, String storageID, Block block,
+    ReportedBlockInfo(DatanodeStorageInfo storageInfo, Block block,
         ReplicaState reportedState) {
-      this.dn = dn;
-      this.storageID = storageID;
+      this.storageInfo = storageInfo;
       this.block = block;
       this.reportedState = reportedState;
     }
@@ -57,21 +56,18 @@ class PendingDataNodeMessages {
       return block;
     }
 
-    DatanodeDescriptor getNode() {
-      return dn;
-    }
-    
-    String getStorageID() {
-      return storageID;
-    }
-
     ReplicaState getReportedState() {
       return reportedState;
     }
+    
+    DatanodeStorageInfo getStorageInfo() {
+      return storageInfo;
+    }
 
     @Override
     public String toString() {
-      return "ReportedBlockInfo [block=" + block + ", dn=" + dn
+      return "ReportedBlockInfo [block=" + block + ", dn="
+          + storageInfo.getDatanodeDescriptor()
           + ", reportedState=" + reportedState + "]";
     }
   }
@@ -87,7 +83,7 @@ class PendingDataNodeMessages {
       Queue<ReportedBlockInfo> oldQueue = entry.getValue();
       while (!oldQueue.isEmpty()) {
         ReportedBlockInfo rbi = oldQueue.remove();
-        if (!rbi.getNode().equals(dn)) {
+        if (!rbi.getStorageInfo().getDatanodeDescriptor().equals(dn)) {
           newQueue.add(rbi);
         } else {
           count--;
@@ -97,11 +93,11 @@ class PendingDataNodeMessages {
     }
   }
   
-  void enqueueReportedBlock(DatanodeDescriptor dn, String storageID, Block 
block,
+  void enqueueReportedBlock(DatanodeStorageInfo storageInfo, Block block,
       ReplicaState reportedState) {
     block = new Block(block);
     getBlockQueue(block).add(
-        new ReportedBlockInfo(dn, storageID, block, reportedState));
+        new ReportedBlockInfo(storageInfo, block, reportedState));
     count++;
   }
   
@@ -127,7 +123,7 @@ class PendingDataNodeMessages {
     return queue;
   }
   
-  public int count() {
+  int count() {
     return count ;
   }
 
@@ -144,7 +140,7 @@ class PendingDataNodeMessages {
     return sb.toString();
   }
 
-  public Iterable<ReportedBlockInfo> takeAll() {
+  Iterable<ReportedBlockInfo> takeAll() {
     List<ReportedBlockInfo> rbis = Lists.newArrayListWithCapacity(
         count);
     for (Queue<ReportedBlockInfo> q : queueByBlockId.values()) {

Modified: 
hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPOfferService.java
URL: 
http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPOfferService.java?rev=1616428&r1=1616427&r2=1616428&view=diff
==============================================================================
--- 
hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPOfferService.java
 (original)
+++ 
hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPOfferService.java
 Thu Aug  7 07:38:23 2014
@@ -21,6 +21,7 @@ import com.google.common.annotations.Vis
 import com.google.common.base.Preconditions;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Sets;
+
 import org.apache.commons.logging.Log;
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.ha.HAServiceProtocol.HAServiceState;
@@ -38,6 +39,8 @@ import java.util.ArrayList;
 import java.util.List;
 import java.util.Set;
 import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
 
 /**
  * One instance per block-pool/namespace on the DN, which handles the
@@ -91,6 +94,28 @@ class BPOfferService {
    */
   private long lastActiveClaimTxId = -1;
 
+  private final ReentrantReadWriteLock mReadWriteLock =
+      new ReentrantReadWriteLock();
+  private final Lock mReadLock  = mReadWriteLock.readLock();
+  private final Lock mWriteLock = mReadWriteLock.writeLock();
+
+  // utility methods to acquire and release read lock and write lock
+  void readLock() {
+    mReadLock.lock();
+  }
+
+  void readUnlock() {
+    mReadLock.unlock();
+  }
+
+  void writeLock() {
+    mWriteLock.lock();
+  }
+
+  void writeUnlock() {
+    mWriteLock.unlock();
+  }
+
   BPOfferService(List<InetSocketAddress> nnAddrs, DataNode dn) {
     Preconditions.checkArgument(!nnAddrs.isEmpty(),
         "Must pass at least one NN.");
@@ -135,14 +160,19 @@ class BPOfferService {
     }
     return false;
   }
-  
-  synchronized String getBlockPoolId() {
-    if (bpNSInfo != null) {
-      return bpNSInfo.getBlockPoolID();
-    } else {
-      LOG.warn("Block pool ID needed, but service not yet registered with NN",
-          new Exception("trace"));
-      return null;
+
+  String getBlockPoolId() {
+    readLock();
+    try {
+      if (bpNSInfo != null) {
+        return bpNSInfo.getBlockPoolID();
+      } else {
+        LOG.warn("Block pool ID needed, but service not yet registered with 
NN",
+            new Exception("trace"));
+        return null;
+      }
+    } finally {
+      readUnlock();
     }
   }
 
@@ -150,27 +180,37 @@ class BPOfferService {
     return getNamespaceInfo() != null;
   }
 
-  synchronized NamespaceInfo getNamespaceInfo() {
-    return bpNSInfo;
+  NamespaceInfo getNamespaceInfo() {
+    readLock();
+    try {
+      return bpNSInfo;
+    } finally {
+      readUnlock();
+    }
   }
 
   @Override
-  public synchronized String toString() {
-    if (bpNSInfo == null) {
-      // If we haven't yet connected to our NN, we don't yet know our
-      // own block pool ID.
-      // If _none_ of the block pools have connected yet, we don't even
-      // know the DatanodeID ID of this DN.
-      String datanodeUuid = dn.getDatanodeUuid();
+  public String toString() {
+    readLock();
+    try {
+      if (bpNSInfo == null) {
+        // If we haven't yet connected to our NN, we don't yet know our
+        // own block pool ID.
+        // If _none_ of the block pools have connected yet, we don't even
+        // know the DatanodeID ID of this DN.
+        String datanodeUuid = dn.getDatanodeUuid();
 
-      if (datanodeUuid == null || datanodeUuid.isEmpty()) {
-        datanodeUuid = "unassigned";
+        if (datanodeUuid == null || datanodeUuid.isEmpty()) {
+          datanodeUuid = "unassigned";
+        }
+        return "Block pool <registering> (Datanode Uuid " + datanodeUuid + ")";
+      } else {
+        return "Block pool " + getBlockPoolId() +
+            " (Datanode Uuid " + dn.getDatanodeUuid() +
+            ")";
       }
-      return "Block pool <registering> (Datanode Uuid " + datanodeUuid + ")";
-    } else {
-      return "Block pool " + getBlockPoolId() +
-          " (Datanode Uuid " + dn.getDatanodeUuid() +
-          ")";
+    } finally {
+      readUnlock();
     }
   }
   
@@ -266,32 +306,37 @@ class BPOfferService {
    * verifies that this namespace matches (eg to prevent a misconfiguration
    * where a StandbyNode from a different cluster is specified)
    */
-  synchronized void verifyAndSetNamespaceInfo(NamespaceInfo nsInfo) throws 
IOException {
-    if (this.bpNSInfo == null) {
-      this.bpNSInfo = nsInfo;
-      boolean success = false;
-
-      // Now that we know the namespace ID, etc, we can pass this to the DN.
-      // The DN can now initialize its local storage if we are the
-      // first BP to handshake, etc.
-      try {
-        dn.initBlockPool(this);
-        success = true;
-      } finally {
-        if (!success) {
-          // The datanode failed to initialize the BP. We need to reset
-          // the namespace info so that other BPService actors still have
-          // a chance to set it, and re-initialize the datanode.
-          this.bpNSInfo = null;
+  void verifyAndSetNamespaceInfo(NamespaceInfo nsInfo) throws IOException {
+    writeLock();
+    try {
+      if (this.bpNSInfo == null) {
+        this.bpNSInfo = nsInfo;
+        boolean success = false;
+
+        // Now that we know the namespace ID, etc, we can pass this to the DN.
+        // The DN can now initialize its local storage if we are the
+        // first BP to handshake, etc.
+        try {
+          dn.initBlockPool(this);
+          success = true;
+        } finally {
+          if (!success) {
+            // The datanode failed to initialize the BP. We need to reset
+            // the namespace info so that other BPService actors still have
+            // a chance to set it, and re-initialize the datanode.
+            this.bpNSInfo = null;
+          }
         }
+      } else {
+        checkNSEquality(bpNSInfo.getBlockPoolID(), nsInfo.getBlockPoolID(),
+            "Blockpool ID");
+        checkNSEquality(bpNSInfo.getNamespaceID(), nsInfo.getNamespaceID(),
+            "Namespace ID");
+        checkNSEquality(bpNSInfo.getClusterID(), nsInfo.getClusterID(),
+            "Cluster ID");
       }
-    } else {
-      checkNSEquality(bpNSInfo.getBlockPoolID(), nsInfo.getBlockPoolID(),
-          "Blockpool ID");
-      checkNSEquality(bpNSInfo.getNamespaceID(), nsInfo.getNamespaceID(),
-          "Namespace ID");
-      checkNSEquality(bpNSInfo.getClusterID(), nsInfo.getClusterID(),
-          "Cluster ID");
+    } finally {
+      writeUnlock();
     }
   }
 
@@ -300,22 +345,27 @@ class BPOfferService {
    * NN, it calls this function to verify that the NN it connected to
    * is consistent with other NNs serving the block-pool.
    */
-  synchronized void registrationSucceeded(BPServiceActor bpServiceActor,
+  void registrationSucceeded(BPServiceActor bpServiceActor,
       DatanodeRegistration reg) throws IOException {
-    if (bpRegistration != null) {
-      checkNSEquality(bpRegistration.getStorageInfo().getNamespaceID(),
-          reg.getStorageInfo().getNamespaceID(), "namespace ID");
-      checkNSEquality(bpRegistration.getStorageInfo().getClusterID(),
-          reg.getStorageInfo().getClusterID(), "cluster ID");
-    } else {
-      bpRegistration = reg;
-    }
-    
-    dn.bpRegistrationSucceeded(bpRegistration, getBlockPoolId());
-    // Add the initial block token secret keys to the DN's secret manager.
-    if (dn.isBlockTokenEnabled) {
-      dn.blockPoolTokenSecretManager.addKeys(getBlockPoolId(),
-          reg.getExportedKeys());
+    writeLock();
+    try {
+      if (bpRegistration != null) {
+        checkNSEquality(bpRegistration.getStorageInfo().getNamespaceID(),
+            reg.getStorageInfo().getNamespaceID(), "namespace ID");
+        checkNSEquality(bpRegistration.getStorageInfo().getClusterID(),
+            reg.getStorageInfo().getClusterID(), "cluster ID");
+      } else {
+        bpRegistration = reg;
+      }
+
+      dn.bpRegistrationSucceeded(bpRegistration, getBlockPoolId());
+      // Add the initial block token secret keys to the DN's secret manager.
+      if (dn.isBlockTokenEnabled) {
+        dn.blockPoolTokenSecretManager.addKeys(getBlockPoolId(),
+            reg.getExportedKeys());
+      }
+    } finally {
+      writeUnlock();
     }
   }
 
@@ -333,25 +383,35 @@ class BPOfferService {
     }
   }
 
-  synchronized DatanodeRegistration createRegistration() {
-    Preconditions.checkState(bpNSInfo != null,
-        "getRegistration() can only be called after initial handshake");
-    return dn.createBPRegistration(bpNSInfo);
+  DatanodeRegistration createRegistration() {
+    writeLock();
+    try {
+      Preconditions.checkState(bpNSInfo != null,
+          "getRegistration() can only be called after initial handshake");
+      return dn.createBPRegistration(bpNSInfo);
+    } finally {
+      writeUnlock();
+    }
   }
 
   /**
    * Called when an actor shuts down. If this is the last actor
    * to shut down, shuts down the whole blockpool in the DN.
    */
-  synchronized void shutdownActor(BPServiceActor actor) {
-    if (bpServiceToActive == actor) {
-      bpServiceToActive = null;
-    }
+  void shutdownActor(BPServiceActor actor) {
+    writeLock();
+    try {
+      if (bpServiceToActive == actor) {
+        bpServiceToActive = null;
+      }
 
-    bpServices.remove(actor);
+      bpServices.remove(actor);
 
-    if (bpServices.isEmpty()) {
-      dn.shutdownBlockPool(this);
+      if (bpServices.isEmpty()) {
+        dn.shutdownBlockPool(this);
+      }
+    } finally {
+      writeUnlock();
     }
   }
 
@@ -392,11 +452,16 @@ class BPOfferService {
    * @return a proxy to the active NN, or null if the BPOS has not
    * acknowledged any NN as active yet.
    */
-  synchronized DatanodeProtocolClientSideTranslatorPB getActiveNN() {
-    if (bpServiceToActive != null) {
-      return bpServiceToActive.bpNamenode;
-    } else {
-      return null;
+  DatanodeProtocolClientSideTranslatorPB getActiveNN() {
+    readLock();
+    try {
+      if (bpServiceToActive != null) {
+        return bpServiceToActive.bpNamenode;
+      } else {
+        return null;
+      }
+    } finally {
+      readUnlock();
     }
   }
 
@@ -424,45 +489,50 @@ class BPOfferService {
    * @param actor the actor which received the heartbeat
    * @param nnHaState the HA-related heartbeat contents
    */
-  synchronized void updateActorStatesFromHeartbeat(
+  void updateActorStatesFromHeartbeat(
       BPServiceActor actor,
       NNHAStatusHeartbeat nnHaState) {
-    final long txid = nnHaState.getTxId();
-    
-    final boolean nnClaimsActive =
-      nnHaState.getState() == HAServiceState.ACTIVE;
-    final boolean bposThinksActive = bpServiceToActive == actor;
-    final boolean isMoreRecentClaim = txid > lastActiveClaimTxId; 
-    
-    if (nnClaimsActive && !bposThinksActive) {
-      LOG.info("Namenode " + actor + " trying to claim ACTIVE state with " +
-          "txid=" + txid);
-      if (!isMoreRecentClaim) {
-        // Split-brain scenario - an NN is trying to claim active
-        // state when a different NN has already claimed it with a higher
-        // txid.
-        LOG.warn("NN " + actor + " tried to claim ACTIVE state at txid=" +
-            txid + " but there was already a more recent claim at txid=" +
-            lastActiveClaimTxId);
-        return;
-      } else {
-        if (bpServiceToActive == null) {
-          LOG.info("Acknowledging ACTIVE Namenode " + actor);
+    writeLock();
+    try {
+      final long txid = nnHaState.getTxId();
+
+      final boolean nnClaimsActive =
+          nnHaState.getState() == HAServiceState.ACTIVE;
+      final boolean bposThinksActive = bpServiceToActive == actor;
+      final boolean isMoreRecentClaim = txid > lastActiveClaimTxId;
+
+      if (nnClaimsActive && !bposThinksActive) {
+        LOG.info("Namenode " + actor + " trying to claim ACTIVE state with " +
+            "txid=" + txid);
+        if (!isMoreRecentClaim) {
+          // Split-brain scenario - an NN is trying to claim active
+          // state when a different NN has already claimed it with a higher
+          // txid.
+          LOG.warn("NN " + actor + " tried to claim ACTIVE state at txid=" +
+              txid + " but there was already a more recent claim at txid=" +
+              lastActiveClaimTxId);
+          return;
         } else {
-          LOG.info("Namenode " + actor + " taking over ACTIVE state from " +
-              bpServiceToActive + " at higher txid=" + txid);
+          if (bpServiceToActive == null) {
+            LOG.info("Acknowledging ACTIVE Namenode " + actor);
+          } else {
+            LOG.info("Namenode " + actor + " taking over ACTIVE state from " +
+                bpServiceToActive + " at higher txid=" + txid);
+          }
+          bpServiceToActive = actor;
         }
-        bpServiceToActive = actor;
+      } else if (!nnClaimsActive && bposThinksActive) {
+        LOG.info("Namenode " + actor + " relinquishing ACTIVE state with " +
+            "txid=" + nnHaState.getTxId());
+        bpServiceToActive = null;
       }
-    } else if (!nnClaimsActive && bposThinksActive) {
-      LOG.info("Namenode " + actor + " relinquishing ACTIVE state with " +
-          "txid=" + nnHaState.getTxId());
-      bpServiceToActive = null;
-    }
-    
-    if (bpServiceToActive == actor) {
-      assert txid >= lastActiveClaimTxId;
-      lastActiveClaimTxId = txid;
+
+      if (bpServiceToActive == actor) {
+        assert txid >= lastActiveClaimTxId;
+        lastActiveClaimTxId = txid;
+      }
+    } finally {
+      writeUnlock();
     }
   }
 
@@ -533,12 +603,15 @@ class BPOfferService {
       actor.reRegister();
       return true;
     }
-    synchronized (this) {
+    writeLock();
+    try {
       if (actor == bpServiceToActive) {
         return processCommandFromActive(cmd, actor);
       } else {
         return processCommandFromStandby(cmd, actor);
       }
+    } finally {
+      writeUnlock();
     }
   }
 


Reply via email to