HDFS-6801. Archival Storage: Add a new data migration tool. Contributed by Tsz 
Wo Nicholas Sze.

git-svn-id: 
https://svn.apache.org/repos/asf/hadoop/common/branches/HDFS-6584@1618675 
13f79535-47bb-0310-9956-ffa450edef68


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/5d5aae06
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/5d5aae06
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/5d5aae06

Branch: refs/heads/trunk
Commit: 5d5aae0694bc27df5b9fa50819854cd3050a8658
Parents: cb75b6b
Author: Jing Zhao <ji...@apache.org>
Authored: Mon Aug 18 17:51:18 2014 +0000
Committer: Jing Zhao <ji...@apache.org>
Committed: Mon Aug 18 17:51:18 2014 +0000

----------------------------------------------------------------------
 .../apache/hadoop/hdfs/BlockStoragePolicy.java  |  22 +-
 .../org/apache/hadoop/hdfs/DFSConfigKeys.java   |   6 +
 .../hadoop/hdfs/server/balancer/Balancer.java   |  19 +-
 .../hadoop/hdfs/server/balancer/Dispatcher.java | 135 ++++--
 .../hdfs/server/balancer/NameNodeConnector.java |  26 +-
 .../apache/hadoop/hdfs/server/mover/Mover.java  | 431 +++++++++++++++++++
 6 files changed, 577 insertions(+), 62 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/5d5aae06/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockStoragePolicy.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockStoragePolicy.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockStoragePolicy.java
index c39d04c..a093525 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockStoragePolicy.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockStoragePolicy.java
@@ -180,7 +180,23 @@ public class BlockStoragePolicy {
   public StorageType getReplicationFallback(EnumSet<StorageType> unavailables) 
{
     return getFallback(unavailables, replicationFallbacks);
   }
-  
+
+  @Override
+  public int hashCode() {
+    return Byte.valueOf(id).hashCode();
+  }
+
+  @Override
+  public boolean equals(Object obj) {
+    if (obj == this) {
+      return true;
+    } else if (obj == null || !(obj instanceof BlockStoragePolicy)) {
+      return false;
+    }
+    final BlockStoragePolicy that = (BlockStoragePolicy)obj;
+    return this.id == that.id;
+  }
+
   @Override
   public String toString() {
     return getClass().getSimpleName() + "{" + name + ":" + id
@@ -193,6 +209,10 @@ public class BlockStoragePolicy {
     return id;
   }
 
+  public String getName() {
+    return name;
+  }
+
   private static StorageType getFallback(EnumSet<StorageType> unavailables,
       StorageType[] fallbacks) {
     for(StorageType fb : fallbacks) {

http://git-wip-us.apache.org/repos/asf/hadoop/blob/5d5aae06/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
index 564f037..be6faeb 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
@@ -362,6 +362,12 @@ public class DFSConfigKeys extends CommonConfigurationKeys 
{
   public static final int     DFS_BALANCER_MOVERTHREADS_DEFAULT = 1000;
   public static final String  DFS_BALANCER_DISPATCHERTHREADS_KEY = 
"dfs.balancer.dispatcherThreads";
   public static final int     DFS_BALANCER_DISPATCHERTHREADS_DEFAULT = 200;
+  
+  public static final String  DFS_MOVER_MOVEDWINWIDTH_KEY = 
"dfs.mover.movedWinWidth";
+  public static final long    DFS_MOVER_MOVEDWINWIDTH_DEFAULT = 5400*1000L;
+  public static final String  DFS_MOVER_MOVERTHREADS_KEY = 
"dfs.mover.moverThreads";
+  public static final int     DFS_MOVER_MOVERTHREADS_DEFAULT = 1000;
+
   public static final String  DFS_DATANODE_ADDRESS_KEY = 
"dfs.datanode.address";
   public static final int     DFS_DATANODE_DEFAULT_PORT = 50010;
   public static final String  DFS_DATANODE_ADDRESS_DEFAULT = "0.0.0.0:" + 
DFS_DATANODE_DEFAULT_PORT;

http://git-wip-us.apache.org/repos/asf/hadoop/blob/5d5aae06/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Balancer.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Balancer.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Balancer.java
index 7661d25..ee49bcf 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Balancer.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Balancer.java
@@ -23,7 +23,6 @@ import java.io.IOException;
 import java.io.PrintStream;
 import java.net.URI;
 import java.text.DateFormat;
-import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collection;
 import java.util.Collections;
@@ -54,6 +53,7 @@ import 
org.apache.hadoop.hdfs.server.blockmanagement.BlockPlacementPolicyDefault
 import org.apache.hadoop.hdfs.server.namenode.UnsupportedActionException;
 import org.apache.hadoop.hdfs.server.protocol.DatanodeStorageReport;
 import org.apache.hadoop.hdfs.server.protocol.StorageReport;
+import org.apache.hadoop.io.IOUtils;
 import org.apache.hadoop.util.StringUtils;
 import org.apache.hadoop.util.Time;
 import org.apache.hadoop.util.Tool;
@@ -270,7 +270,7 @@ public class Balancer {
     //   over-utilized, above-average, below-average and under-utilized.
     long overLoadedBytes = 0L, underLoadedBytes = 0L;
     for(DatanodeStorageReport r : reports) {
-      final DDatanode dn = dispatcher.newDatanode(r);
+      final DDatanode dn = dispatcher.newDatanode(r.getDatanodeInfo());
       for(StorageType t : StorageType.asList()) {
         final Double utilization = policy.getUtilization(r, t);
         if (utilization == null) { // datanode does not have such storage type 
@@ -294,7 +294,7 @@ public class Balancer {
           }
           g = s;
         } else {
-          g = dn.addStorageGroup(t, maxSize2Move);
+          g = dn.addTarget(t, maxSize2Move);
           if (thresholdDiff <= 0) { // within threshold
             belowAvgUtilized.add(g);
           } else {
@@ -546,15 +546,10 @@ public class Balancer {
     final Formatter formatter = new Formatter(System.out);
     System.out.println("Time Stamp               Iteration#  Bytes Already 
Moved  Bytes Left To Move  Bytes Being Moved");
     
-    final List<NameNodeConnector> connectors
-        = new ArrayList<NameNodeConnector>(namenodes.size());
+    List<NameNodeConnector> connectors = Collections.emptyList();
     try {
-      for (URI uri : namenodes) {
-        final NameNodeConnector nnc = new NameNodeConnector(
-            Balancer.class.getSimpleName(), uri, BALANCER_ID_PATH, conf);
-        nnc.getKeyManager().startBlockKeyUpdater();
-        connectors.add(nnc);
-      }
+      connectors = NameNodeConnector.newNameNodeConnectors(namenodes, 
+            Balancer.class.getSimpleName(), BALANCER_ID_PATH, conf);
     
       boolean done = false;
       for(int iteration = 0; !done; iteration++) {
@@ -579,7 +574,7 @@ public class Balancer {
       }
     } finally {
       for(NameNodeConnector nnc : connectors) {
-        nnc.close();
+        IOUtils.cleanup(LOG, nnc);
       }
     }
     return ExitStatus.SUCCESS.getExitCode();

http://git-wip-us.apache.org/repos/asf/hadoop/blob/5d5aae06/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Dispatcher.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Dispatcher.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Dispatcher.java
index b7dceb6..bb498cc 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Dispatcher.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Dispatcher.java
@@ -49,6 +49,7 @@ import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.CommonConfigurationKeys;
 import org.apache.hadoop.hdfs.DFSUtil;
+import org.apache.hadoop.hdfs.DistributedFileSystem;
 import org.apache.hadoop.hdfs.StorageType;
 import org.apache.hadoop.hdfs.protocol.Block;
 import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
@@ -103,7 +104,8 @@ public class Dispatcher {
   private final MovedBlocks<StorageGroup> movedBlocks;
 
   /** Map (datanodeUuid,storageType -> StorageGroup) */
-  private final StorageGroupMap storageGroupMap = new StorageGroupMap();
+  private final StorageGroupMap<StorageGroup> storageGroupMap
+      = new StorageGroupMap<StorageGroup>();
 
   private NetworkTopology cluster;
 
@@ -140,18 +142,18 @@ public class Dispatcher {
     }
   }
 
-  static class StorageGroupMap {
+  public static class StorageGroupMap<G extends StorageGroup> {
     private static String toKey(String datanodeUuid, StorageType storageType) {
       return datanodeUuid + ":" + storageType;
     }
 
-    private final Map<String, StorageGroup> map = new HashMap<String, 
StorageGroup>();
+    private final Map<String, G> map = new HashMap<String, G>();
 
-    StorageGroup get(String datanodeUuid, StorageType storageType) {
+    public G get(String datanodeUuid, StorageType storageType) {
       return map.get(toKey(datanodeUuid, storageType));
     }
 
-    void put(StorageGroup g) {
+    public void put(G g) {
       final String key = toKey(g.getDatanodeInfo().getDatanodeUuid(), 
g.storageType);
       final StorageGroup existing = map.put(key, g);
       Preconditions.checkState(existing == null);
@@ -167,7 +169,7 @@ public class Dispatcher {
   }
 
   /** This class keeps track of a scheduled block move */
-  private class PendingMove {
+  public class PendingMove {
     private DBlock block;
     private Source source;
     private DDatanode proxySource;
@@ -176,6 +178,12 @@ public class Dispatcher {
     private PendingMove() {
     }
 
+    public PendingMove(DBlock block, Source source, StorageGroup target) {
+      this.block = block;
+      this.source = source;
+      this.target = target;
+    }
+
     @Override
     public String toString() {
       final Block b = block.getBlock();
@@ -227,7 +235,7 @@ public class Dispatcher {
      * 
      * @return true if a proxy is found; otherwise false
      */
-    private boolean chooseProxySource() {
+    public boolean chooseProxySource() {
       final DatanodeInfo targetDN = target.getDatanodeInfo();
       // if node group is supported, first try add nodes in the same node group
       if (cluster.isNodeGroupAware()) {
@@ -356,8 +364,8 @@ public class Dispatcher {
   }
 
   /** A class for keeping track of block locations in the dispatcher. */
-  private static class DBlock extends MovedBlocks.Locations<StorageGroup> {
-    DBlock(Block block) {
+  public static class DBlock extends MovedBlocks.Locations<StorageGroup> {
+    public DBlock(Block block) {
       super(block);
     }
   }
@@ -378,10 +386,10 @@ public class Dispatcher {
   }
 
   /** A class that keeps track of a datanode. */
-  static class DDatanode {
+  public static class DDatanode {
 
     /** A group of storages in a datanode with the same storage type. */
-    class StorageGroup {
+    public class StorageGroup {
       final StorageType storageType;
       final long maxSize2Move;
       private long scheduledSize = 0L;
@@ -390,18 +398,26 @@ public class Dispatcher {
         this.storageType = storageType;
         this.maxSize2Move = maxSize2Move;
       }
+      
+      public StorageType getStorageType() {
+        return storageType;
+      }
 
       private DDatanode getDDatanode() {
         return DDatanode.this;
       }
 
-      DatanodeInfo getDatanodeInfo() {
+      public DatanodeInfo getDatanodeInfo() {
         return DDatanode.this.datanode;
       }
 
       /** Decide if still need to move more bytes */
-      synchronized boolean hasSpaceForScheduling() {
-        return availableSizeToMove() > 0L;
+      boolean hasSpaceForScheduling() {
+        return hasSpaceForScheduling(0L);
+      }
+
+      synchronized boolean hasSpaceForScheduling(long size) {
+        return availableSizeToMove() > size;
       }
 
       /** @return the total number of bytes that need to be moved */
@@ -410,7 +426,7 @@ public class Dispatcher {
       }
 
       /** increment scheduled size */
-      synchronized void incScheduledSize(long size) {
+      public synchronized void incScheduledSize(long size) {
         scheduledSize += size;
       }
 
@@ -436,7 +452,9 @@ public class Dispatcher {
     }
 
     final DatanodeInfo datanode;
-    final EnumMap<StorageType, StorageGroup> storageMap
+    private final EnumMap<StorageType, Source> sourceMap
+        = new EnumMap<StorageType, Source>(StorageType.class);
+    private final EnumMap<StorageType, StorageGroup> targetMap
         = new EnumMap<StorageType, StorageGroup>(StorageType.class);
     protected long delayUntil = 0L;
     /** blocks being moved but not confirmed yet */
@@ -445,29 +463,34 @@ public class Dispatcher {
 
     @Override
     public String toString() {
-      return getClass().getSimpleName() + ":" + datanode + ":" + 
storageMap.values();
+      return getClass().getSimpleName() + ":" + datanode;
     }
 
-    private DDatanode(DatanodeStorageReport r, int maxConcurrentMoves) {
-      this.datanode = r.getDatanodeInfo();
+    private DDatanode(DatanodeInfo datanode, int maxConcurrentMoves) {
+      this.datanode = datanode;
       this.maxConcurrentMoves = maxConcurrentMoves;
       this.pendings = new ArrayList<PendingMove>(maxConcurrentMoves);
     }
 
-    private void put(StorageType storageType, StorageGroup g) {
-      final StorageGroup existing = storageMap.put(storageType, g);
+    public DatanodeInfo getDatanodeInfo() {
+      return datanode;
+    }
+
+    private static <G extends StorageGroup> void put(StorageType storageType,
+        G g, EnumMap<StorageType, G> map) {
+      final StorageGroup existing = map.put(storageType, g);
       Preconditions.checkState(existing == null);
     }
 
-    StorageGroup addStorageGroup(StorageType storageType, long maxSize2Move) {
+    public StorageGroup addTarget(StorageType storageType, long maxSize2Move) {
       final StorageGroup g = new StorageGroup(storageType, maxSize2Move);
-      put(storageType, g);
+      put(storageType, g, targetMap);
       return g;
     }
 
-    Source addSource(StorageType storageType, long maxSize2Move, Dispatcher d) 
{
+    public Source addSource(StorageType storageType, long maxSize2Move, 
Dispatcher d) {
       final Source s = d.new Source(storageType, maxSize2Move, this);
-      put(storageType, s);
+      put(storageType, s, sourceMap);
       return s;
     }
 
@@ -508,7 +531,7 @@ public class Dispatcher {
   }
 
   /** A node that can be the sources of a block move */
-  class Source extends DDatanode.StorageGroup {
+  public class Source extends DDatanode.StorageGroup {
 
     private final List<Task> tasks = new ArrayList<Task>(2);
     private long blocksToReceive = 0L;
@@ -654,13 +677,7 @@ public class Dispatcher {
           && (!srcBlocks.isEmpty() || blocksToReceive > 0)) {
         final PendingMove p = chooseNextMove();
         if (p != null) {
-          // move the block
-          moveExecutor.execute(new Runnable() {
-            @Override
-            public void run() {
-              p.dispatch();
-            }
-          });
+          executePendingMove(p);
           continue;
         }
 
@@ -716,7 +733,8 @@ public class Dispatcher {
     this.cluster = NetworkTopology.getInstance(conf);
 
     this.moveExecutor = Executors.newFixedThreadPool(moverThreads);
-    this.dispatchExecutor = Executors.newFixedThreadPool(dispatcherThreads);
+    this.dispatchExecutor = dispatcherThreads == 0? null
+        : Executors.newFixedThreadPool(dispatcherThreads);
     this.maxConcurrentMovesPerNode = maxConcurrentMovesPerNode;
 
     final boolean fallbackToSimpleAuthAllowed = conf.getBoolean(
@@ -727,11 +745,15 @@ public class Dispatcher {
         TrustedChannelResolver.getInstance(conf), fallbackToSimpleAuthAllowed);
   }
 
-  StorageGroupMap getStorageGroupMap() {
+  public DistributedFileSystem getDistributedFileSystem() {
+    return nnc.getDistributedFileSystem();
+  }
+
+  public StorageGroupMap<StorageGroup> getStorageGroupMap() {
     return storageGroupMap;
   }
 
-  NetworkTopology getCluster() {
+  public NetworkTopology getCluster() {
     return cluster;
   }
   
@@ -779,7 +801,7 @@ public class Dispatcher {
   }
 
   /** Get live datanode storage reports and then build the network topology. */
-  List<DatanodeStorageReport> init() throws IOException {
+  public List<DatanodeStorageReport> init() throws IOException {
     final DatanodeStorageReport[] reports = nnc.getLiveDatanodeStorageReport();
     final List<DatanodeStorageReport> trimmed = new 
ArrayList<DatanodeStorageReport>(); 
     // create network topology and classify utilization collections:
@@ -795,8 +817,18 @@ public class Dispatcher {
     return trimmed;
   }
 
-  public DDatanode newDatanode(DatanodeStorageReport r) {
-    return new DDatanode(r, maxConcurrentMovesPerNode);
+  public DDatanode newDatanode(DatanodeInfo datanode) {
+    return new DDatanode(datanode, maxConcurrentMovesPerNode);
+  }
+
+  public void executePendingMove(final PendingMove p) {
+    // move the block
+    moveExecutor.execute(new Runnable() {
+      @Override
+      public void run() {
+        p.dispatch();
+      }
+    });
   }
 
   public boolean dispatchAndCheckContinue() throws InterruptedException {
@@ -869,6 +901,12 @@ public class Dispatcher {
     }
   }
 
+  private boolean isGoodBlockCandidate(StorageGroup source, StorageGroup 
target,
+      DBlock block) {
+    // match source and target storage type
+    return isGoodBlockCandidate(source, target, source.getStorageType(), 
block);
+  }
+
   /**
    * Decide if the block is a good candidate to be moved from source to target.
    * A block is a good candidate if 
@@ -876,9 +914,12 @@ public class Dispatcher {
    * 2. the block does not have a replica on the target;
    * 3. doing the move does not reduce the number of racks that the block has
    */
-  private boolean isGoodBlockCandidate(Source source, StorageGroup target,
-      DBlock block) {
-    if (source.storageType != target.storageType) {
+  public boolean isGoodBlockCandidate(StorageGroup source, StorageGroup target,
+      StorageType targetStorageType, DBlock block) {
+    if (target.storageType != targetStorageType) {
+      return false;
+    }
+    if (!target.hasSpaceForScheduling(block.getNumBytes())) {
       return false;
     }
     // check if the block is moved or not
@@ -889,7 +930,7 @@ public class Dispatcher {
       return false;
     }
     if (cluster.isNodeGroupAware()
-        && isOnSameNodeGroupWithReplicas(target, block, source)) {
+        && isOnSameNodeGroupWithReplicas(source, target, block)) {
       return false;
     }
     if (reduceNumOfRacks(source, target, block)) {
@@ -902,7 +943,7 @@ public class Dispatcher {
    * Determine whether moving the given block replica from source to target
    * would reduce the number of racks of the block replicas.
    */
-  private boolean reduceNumOfRacks(Source source, StorageGroup target,
+  private boolean reduceNumOfRacks(StorageGroup source, StorageGroup target,
       DBlock block) {
     final DatanodeInfo sourceDn = source.getDatanodeInfo();
     if (cluster.isOnSameRack(sourceDn, target.getDatanodeInfo())) {
@@ -939,8 +980,8 @@ public class Dispatcher {
    * @return true if there are any replica (other than source) on the same node
    *         group with target
    */
-  private boolean isOnSameNodeGroupWithReplicas(
-      StorageGroup target, DBlock block, Source source) {
+  private boolean isOnSameNodeGroupWithReplicas(StorageGroup source,
+      StorageGroup target, DBlock block) {
     final DatanodeInfo targetDn = target.getDatanodeInfo();
     for (StorageGroup g : block.getLocations()) {
       if (g != source && cluster.isOnSameNodeGroup(g.getDatanodeInfo(), 
targetDn)) {
@@ -961,7 +1002,7 @@ public class Dispatcher {
   }
 
   /** shutdown thread pools */
-  void shutdownNow() {
+  public void shutdownNow() {
     dispatchExecutor.shutdownNow();
     moveExecutor.shutdownNow();
   }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/5d5aae06/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/NameNodeConnector.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/NameNodeConnector.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/NameNodeConnector.java
index 820a4ed..a86023e 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/NameNodeConnector.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/NameNodeConnector.java
@@ -23,6 +23,9 @@ import java.io.IOException;
 import java.io.OutputStream;
 import java.net.InetAddress;
 import java.net.URI;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -31,6 +34,7 @@ import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.FsServerDefaults;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hdfs.DistributedFileSystem;
 import org.apache.hadoop.hdfs.NameNodeProxies;
 import org.apache.hadoop.hdfs.protocol.AlreadyBeingCreatedException;
 import org.apache.hadoop.hdfs.protocol.ClientProtocol;
@@ -51,6 +55,20 @@ public class NameNodeConnector implements Closeable {
   private static final Log LOG = LogFactory.getLog(NameNodeConnector.class);
 
   private static final int MAX_NOT_CHANGED_ITERATIONS = 5;
+  
+  /** Create {@link NameNodeConnector} for the given namenodes. */
+  public static List<NameNodeConnector> newNameNodeConnectors(
+      Collection<URI> namenodes, String name, Path idPath, Configuration conf)
+      throws IOException {
+    final List<NameNodeConnector> connectors = new 
ArrayList<NameNodeConnector>(
+        namenodes.size());
+    for (URI uri : namenodes) {
+      NameNodeConnector nnc = new NameNodeConnector(name, uri, idPath, conf);
+      nnc.getKeyManager().startBlockKeyUpdater();
+      connectors.add(nnc);
+    }
+    return connectors;
+  }
 
   private final URI nameNodeUri;
   private final String blockpoolID;
@@ -59,7 +77,7 @@ public class NameNodeConnector implements Closeable {
   private final ClientProtocol client;
   private final KeyManager keyManager;
 
-  private final FileSystem fs;
+  private final DistributedFileSystem fs;
   private final Path idPath;
   private final OutputStream out;
 
@@ -74,7 +92,7 @@ public class NameNodeConnector implements Closeable {
         NamenodeProtocol.class).getProxy();
     this.client = NameNodeProxies.createProxy(conf, nameNodeUri,
         ClientProtocol.class).getProxy();
-    this.fs = FileSystem.get(nameNodeUri, conf);
+    this.fs = (DistributedFileSystem)FileSystem.get(nameNodeUri, conf);
 
     final NamespaceInfo namespaceinfo = namenode.versionRequest();
     this.blockpoolID = namespaceinfo.getBlockPoolID();
@@ -89,6 +107,10 @@ public class NameNodeConnector implements Closeable {
     }
   }
 
+  public DistributedFileSystem getDistributedFileSystem() {
+    return fs;
+  }
+
   /** @return the block pool ID */
   public String getBlockpoolID() {
     return blockpoolID;

http://git-wip-us.apache.org/repos/asf/hadoop/blob/5d5aae06/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/mover/Mover.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/mover/Mover.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/mover/Mover.java
new file mode 100644
index 0000000..a43abf2
--- /dev/null
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/mover/Mover.java
@@ -0,0 +1,431 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.mover;
+
+import java.io.IOException;
+import java.net.URI;
+import java.text.DateFormat;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Date;
+import java.util.EnumMap;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.List;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.conf.Configured;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hdfs.BlockStoragePolicy;
+import org.apache.hadoop.hdfs.DFSClient;
+import org.apache.hadoop.hdfs.DFSConfigKeys;
+import org.apache.hadoop.hdfs.DFSUtil;
+import org.apache.hadoop.hdfs.HdfsConfiguration;
+import org.apache.hadoop.hdfs.StorageType;
+import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
+import org.apache.hadoop.hdfs.protocol.DirectoryListing;
+import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
+import org.apache.hadoop.hdfs.protocol.HdfsLocatedFileStatus;
+import org.apache.hadoop.hdfs.protocol.LocatedBlock;
+import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
+import org.apache.hadoop.hdfs.server.balancer.Dispatcher;
+import org.apache.hadoop.hdfs.server.balancer.Dispatcher.DBlock;
+import org.apache.hadoop.hdfs.server.balancer.Dispatcher.DDatanode;
+import 
org.apache.hadoop.hdfs.server.balancer.Dispatcher.DDatanode.StorageGroup;
+import org.apache.hadoop.hdfs.server.balancer.Dispatcher.PendingMove;
+import org.apache.hadoop.hdfs.server.balancer.Dispatcher.Source;
+import org.apache.hadoop.hdfs.server.balancer.Dispatcher.StorageGroupMap;
+import org.apache.hadoop.hdfs.server.balancer.ExitStatus;
+import org.apache.hadoop.hdfs.server.balancer.Matcher;
+import org.apache.hadoop.hdfs.server.balancer.NameNodeConnector;
+import org.apache.hadoop.hdfs.server.protocol.DatanodeStorageReport;
+import org.apache.hadoop.hdfs.server.protocol.StorageReport;
+import org.apache.hadoop.io.IOUtils;
+import org.apache.hadoop.net.NetworkTopology;
+import org.apache.hadoop.util.StringUtils;
+import org.apache.hadoop.util.Time;
+import org.apache.hadoop.util.Tool;
+import org.apache.hadoop.util.ToolRunner;
+
+@InterfaceAudience.Private
+public class Mover {
+  static final Log LOG = LogFactory.getLog(Mover.class);
+
+  private static final Path MOVER_ID_PATH = new Path("/system/mover.id");
+
+  private static class StorageMap {
+    private final StorageGroupMap<Source> sources
+        = new StorageGroupMap<Source>();
+    private final StorageGroupMap<StorageGroup> targets
+        = new StorageGroupMap<StorageGroup>();
+    private final EnumMap<StorageType, List<StorageGroup>> targetStorageTypeMap
+        = new EnumMap<StorageType, List<StorageGroup>>(StorageType.class);
+    
+    private StorageMap() {
+      for(StorageType t : StorageType.asList()) {
+        targetStorageTypeMap.put(t, new LinkedList<StorageGroup>());
+      }
+    }
+    
+    private void add(Source source, StorageGroup target) {
+      sources.put(source);
+      targets.put(target);
+      getTargetStorages(target.getStorageType()).add(target);
+    }
+    
+    private Source getSource(MLocation ml) {
+      return get(sources, ml);
+    }
+
+    private StorageGroup getTarget(MLocation ml) {
+      return get(targets, ml);
+    }
+
+    private static <G extends StorageGroup> G get(StorageGroupMap<G> map, 
MLocation ml) {
+      return map.get(ml.datanode.getDatanodeUuid(), ml.storageType);
+    }
+    
+    private List<StorageGroup> getTargetStorages(StorageType t) {
+      return targetStorageTypeMap.get(t);
+    }
+  }
+
+  private final Dispatcher dispatcher;
+  private final StorageMap storages;
+
+  private final BlockStoragePolicy.Suite blockStoragePolicies;
+
+  Mover(NameNodeConnector nnc, Configuration conf) {
+    final long movedWinWidth = conf.getLong(
+        DFSConfigKeys.DFS_MOVER_MOVEDWINWIDTH_KEY,
+        DFSConfigKeys.DFS_MOVER_MOVEDWINWIDTH_DEFAULT);
+    final int moverThreads = conf.getInt(
+        DFSConfigKeys.DFS_MOVER_MOVERTHREADS_KEY,
+        DFSConfigKeys.DFS_MOVER_MOVERTHREADS_DEFAULT);
+    final int maxConcurrentMovesPerNode = conf.getInt(
+        DFSConfigKeys.DFS_DATANODE_BALANCE_MAX_NUM_CONCURRENT_MOVES_KEY,
+        DFSConfigKeys.DFS_DATANODE_BALANCE_MAX_NUM_CONCURRENT_MOVES_DEFAULT);
+
+    this.dispatcher = new Dispatcher(nnc, Collections.<String> emptySet(),
+        Collections.<String> emptySet(), movedWinWidth, moverThreads, 0,
+        maxConcurrentMovesPerNode, conf);
+    this.storages = new StorageMap();
+    this.blockStoragePolicies = BlockStoragePolicy.readBlockStorageSuite(conf);
+  }
+  
+  private ExitStatus run() {
+    try {
+      final List<DatanodeStorageReport> reports = dispatcher.init();
+      for(DatanodeStorageReport r : reports) {
+        final DDatanode dn = dispatcher.newDatanode(r.getDatanodeInfo());
+        for(StorageType t : StorageType.asList()) {
+          final long maxRemaining = getMaxRemaining(r, t);
+          if (maxRemaining > 0L) {
+            final Source source = dn.addSource(t, Long.MAX_VALUE, dispatcher); 
+            final StorageGroup target = dn.addTarget(t, maxRemaining);
+            storages.add(source, target);
+          }
+        }
+      }
+
+      new Processor().processNamespace();
+
+      return ExitStatus.IN_PROGRESS;
+    } catch (IllegalArgumentException e) {
+      System.out.println(e + ".  Exiting ...");
+      return ExitStatus.ILLEGAL_ARGUMENTS;
+    } catch (IOException e) {
+      System.out.println(e + ".  Exiting ...");
+      return ExitStatus.IO_EXCEPTION;
+    } finally {
+      dispatcher.shutdownNow();
+    }
+  }
+
+  private static long getMaxRemaining(DatanodeStorageReport report, 
StorageType t) {
+    long max = 0L;
+    for(StorageReport r : report.getStorageReports()) {
+      if (r.getStorage().getStorageType() == t) {
+        if (r.getRemaining() > max) {
+          max = r.getRemaining();
+        }
+      }
+    }
+    return max;
+  }
+
+  private class Processor {
+    private final DFSClient dfs;
+  
+    private Processor() {
+      dfs = dispatcher.getDistributedFileSystem().getClient();
+    }
+      
+    private void processNamespace() {
+      try {
+        processDirRecursively("", dfs.getFileInfo("/"));
+      } catch (IOException e) {
+        LOG.warn("Failed to get root directory status. Ignore and continue.", 
e);
+      }
+    }
+
+    private void processDirRecursively(String parent, HdfsFileStatus status) {
+      if (status.isSymlink()) {
+        return; //ignore symlinks
+      } else if (status.isDir()) {
+        String dir = status.getFullName(parent);
+        if (!dir.endsWith(Path.SEPARATOR)) {
+          dir = dir + Path.SEPARATOR; 
+        }
+
+        for(byte[] lastReturnedName = HdfsFileStatus.EMPTY_NAME;;) {
+          final DirectoryListing children;
+          try {
+            children = dfs.listPaths(dir, lastReturnedName, true);
+          } catch(IOException e) {
+            LOG.warn("Failed to list directory " + dir
+                + ".  Ignore the directory and continue.", e);
+            return;
+          }
+          if (children == null) {
+            return;
+          }
+          for (HdfsFileStatus child : children.getPartialListing()) {
+            processDirRecursively(dir, child);
+          }
+          if (!children.hasMore()) {
+            lastReturnedName = children.getLastName();
+          } else {
+            return;
+          }
+        }
+      } else { // file
+        processFile(parent, (HdfsLocatedFileStatus)status);
+      }
+    }
+
+    private void processFile(String parent, HdfsLocatedFileStatus status) { 
+      final BlockStoragePolicy policy = blockStoragePolicies.getPolicy(
+          status.getStoragePolicy());
+      final List<StorageType> types = policy.chooseStorageTypes(
+          status.getReplication());
+
+      final LocatedBlocks locations = status.getBlockLocations();
+      for(LocatedBlock lb : locations.getLocatedBlocks()) {
+        final StorageTypeDiff diff = new StorageTypeDiff(types, 
lb.getStorageTypes());
+        if (!diff.removeOverlap()) {
+          scheduleMoves4Block(diff, lb);
+        }
+      }
+    }
+    
+    void scheduleMoves4Block(StorageTypeDiff diff, LocatedBlock lb) {
+      final List<MLocation> locations = MLocation.toLocations(lb);
+      Collections.shuffle(locations);
+      
+      final DBlock db = new DBlock(lb.getBlock().getLocalBlock());
+      for(MLocation ml : locations) {
+        db.addLocation(storages.getTarget(ml));
+      }
+
+      for(final Iterator<StorageType> i = diff.existing.iterator(); 
i.hasNext(); ) {
+        final StorageType t = i.next();
+        for(final Iterator<MLocation> j = locations.iterator(); j.hasNext(); ) 
{
+          final MLocation ml = j.next();
+          final Source source = storages.getSource(ml); 
+          if (ml.storageType == t) {
+            // try to schedule replica move.
+            if (scheduleMoveReplica(db, ml, source, diff.expected)) {
+              i.remove();
+              j.remove();
+            }
+          }
+        }
+      }
+    }
+
+    boolean scheduleMoveReplica(DBlock db, MLocation ml, Source source,
+        List<StorageType> targetTypes) {
+      if (dispatcher.getCluster().isNodeGroupAware()) {
+        if (chooseTarget(db, ml, source, targetTypes, 
Matcher.SAME_NODE_GROUP)) {
+          return true;
+        }
+      }
+      
+      // Then, match nodes on the same rack
+      if (chooseTarget(db, ml, source, targetTypes, Matcher.SAME_RACK)) {
+        return true;
+      }
+      // At last, match all remaining nodes
+      if (chooseTarget(db, ml, source, targetTypes, Matcher.ANY_OTHER)) {
+        return true;
+      }
+      return false;
+    }
+
+    boolean chooseTarget(DBlock db, MLocation ml, Source source,
+        List<StorageType> targetTypes, Matcher matcher) {
+      final NetworkTopology cluster = dispatcher.getCluster(); 
+      for(final Iterator<StorageType> i = targetTypes.iterator(); i.hasNext(); 
) {
+        final StorageType t = i.next();
+        for(StorageGroup target : storages.getTargetStorages(t)) {
+          if (matcher.match(cluster, ml.datanode, target.getDatanodeInfo())
+              && dispatcher.isGoodBlockCandidate(source, target, t, db)) {
+            final PendingMove pm = dispatcher.new PendingMove(db, source, 
target);
+            if (pm.chooseProxySource()) {
+              i.remove();
+              target.incScheduledSize(ml.size);
+              dispatcher.executePendingMove(pm);
+              return true;
+            }
+          }
+        }
+      }
+      return false;
+    }
+  }
+  
+
+  static class MLocation {
+    final DatanodeInfo datanode;
+    final StorageType storageType;
+    final long size;
+    
+    MLocation(DatanodeInfo datanode, StorageType storageType, long size) {
+      this.datanode = datanode;
+      this.storageType = storageType;
+      this.size = size;
+    }
+    
+    static List<MLocation> toLocations(LocatedBlock lb) {
+      final DatanodeInfo[] datanodeInfos = lb.getLocations();
+      final StorageType[] storageTypes = lb.getStorageTypes();
+      final long size = lb.getBlockSize();
+      final List<MLocation> locations = new LinkedList<MLocation>();
+      for(int i = 0; i < datanodeInfos.length; i++) {
+        locations.add(new MLocation(datanodeInfos[i], storageTypes[i], size));
+      }
+      return locations;
+    }
+  }
+
+  private static class StorageTypeDiff {
+    final List<StorageType> expected;
+    final List<StorageType> existing;
+
+    StorageTypeDiff(List<StorageType> expected, StorageType[] existing) {
+      this.expected = new LinkedList<StorageType>(expected);
+      this.existing = new LinkedList<StorageType>(Arrays.asList(existing));
+    }
+    
+    /**
+     * Remove the overlap between the expected types and the existing types.
+     * @return if the existing types is empty after removed the overlap.
+     */
+    boolean removeOverlap() { 
+      for(Iterator<StorageType> i = existing.iterator(); i.hasNext(); ) {
+        final StorageType t = i.next();
+        if (expected.remove(t)) {
+          i.remove();
+        }
+      }
+      return existing.isEmpty();
+    }
+  }
+
+  static int run(Collection<URI> namenodes, Configuration conf)
+      throws IOException, InterruptedException {
+    final long sleeptime = 2000*conf.getLong(
+        DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_KEY,
+        DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_DEFAULT);
+    LOG.info("namenodes = " + namenodes);
+    
+    List<NameNodeConnector> connectors = Collections.emptyList();
+    try {
+      connectors = NameNodeConnector.newNameNodeConnectors(namenodes, 
+            Mover.class.getSimpleName(), MOVER_ID_PATH, conf);
+    
+      while (true) {
+        Collections.shuffle(connectors);
+        for(NameNodeConnector nnc : connectors) {
+          final Mover m = new Mover(nnc, conf);
+          final ExitStatus r = m.run();
+
+          if (r != ExitStatus.IN_PROGRESS) {
+            //must be an error statue, return.
+            return r.getExitCode();
+          }
+        }
+
+        Thread.sleep(sleeptime);
+      }
+    } finally {
+      for(NameNodeConnector nnc : connectors) {
+        IOUtils.cleanup(LOG, nnc);
+      }
+    }
+  }
+
+  static class Cli extends Configured implements Tool {
+    private static final String USAGE = "Usage: java "
+        + Mover.class.getSimpleName();
+
+    @Override
+    public int run(String[] args) throws Exception {
+      final long startTime = Time.monotonicNow();
+      final Configuration conf = getConf();
+
+      try {
+        final Collection<URI> namenodes = DFSUtil.getNsServiceRpcUris(conf);
+        return Mover.run(namenodes, conf);
+      } catch (IOException e) {
+        System.out.println(e + ".  Exiting ...");
+        return ExitStatus.IO_EXCEPTION.getExitCode();
+      } catch (InterruptedException e) {
+        System.out.println(e + ".  Exiting ...");
+        return ExitStatus.INTERRUPTED.getExitCode();
+      } finally {
+        System.out.format("%-24s ", 
DateFormat.getDateTimeInstance().format(new Date()));
+        System.out.println("Mover took " + 
StringUtils.formatTime(Time.monotonicNow()-startTime));
+      }
+    }
+
+    /**
+     * Run a Mover in command line.
+     * 
+     * @param args Command line arguments
+     */
+    public static void main(String[] args) {
+      if (DFSUtil.parseHelpArgument(args, USAGE, System.out, true)) {
+        System.exit(0);
+      }
+
+      try {
+        System.exit(ToolRunner.run(new HdfsConfiguration(), new Cli(), args));
+      } catch (Throwable e) {
+        LOG.error("Exiting " + Mover.class.getSimpleName()
+            + " due to an exception", e);
+        System.exit(-1);
+      }
+    }
+  }
+}
\ No newline at end of file

Reply via email to