HDFS-6997: add more tests for data migration and replicaion.

Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/22a41dce
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/22a41dce
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/22a41dce

Branch: refs/heads/trunk
Commit: 22a41dce4af4d5b533ba875b322551db1c152878
Parents: ba4fc93
Author: Tsz-Wo Nicholas Sze <szets...@hortonworks.com>
Authored: Sun Sep 7 07:44:28 2014 +0800
Committer: Tsz-Wo Nicholas Sze <szets...@hortonworks.com>
Committed: Sun Sep 7 07:44:28 2014 +0800

----------------------------------------------------------------------
 .../server/blockmanagement/BlockManager.java    |   2 +-
 .../BlockPlacementPolicyDefault.java            | 131 +++--
 .../blockmanagement/DatanodeDescriptor.java     |  52 +-
 .../blockmanagement/DatanodeStorageInfo.java    |   2 +-
 .../apache/hadoop/hdfs/server/mover/Mover.java  |   7 +
 .../apache/hadoop/hdfs/util/EnumCounters.java   |   9 +
 .../org/apache/hadoop/hdfs/MiniDFSCluster.java  |  12 +-
 .../hdfs/server/mover/TestStorageMover.java     | 477 +++++++++++++++++--
 8 files changed, 579 insertions(+), 113 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/22a41dce/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java
index af83653..956900d 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java
@@ -2930,7 +2930,7 @@ public class BlockManager {
     // Decrement number of blocks scheduled to this datanode.
     // for a retry request (of DatanodeProtocol#blockReceivedAndDeleted with 
     // RECEIVED_BLOCK), we currently also decrease the approximate number. 
-    node.decrementBlocksScheduled();
+    node.decrementBlocksScheduled(storageInfo.getStorageType());
 
     // get the deletion hint node
     DatanodeDescriptor delHintNode = null;

http://git-wip-us.apache.org/repos/asf/hadoop/blob/22a41dce/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicyDefault.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicyDefault.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicyDefault.java
index 593ea90..a0e6701 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicyDefault.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicyDefault.java
@@ -76,12 +76,6 @@ public class BlockPlacementPolicyDefault extends 
BlockPlacementPolicy {
    */
   protected int tolerateHeartbeatMultiplier;
 
-  protected BlockPlacementPolicyDefault(Configuration conf, FSClusterStats 
stats,
-                           NetworkTopology clusterMap, 
-                           Host2NodesMap host2datanodeMap) {
-    initialize(conf, stats, clusterMap, host2datanodeMap);
-  }
-
   protected BlockPlacementPolicyDefault() {
   }
     
@@ -174,6 +168,10 @@ public class BlockPlacementPolicyDefault extends 
BlockPlacementPolicy {
       return getPipeline(writer,
           results.toArray(new DatanodeStorageInfo[results.size()]));
     } catch (NotEnoughReplicasException nr) {
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("Failed to choose with favored nodes (=" + favoredNodes
+            + "), disregard favored nodes hint and retry.", nr);
+      }
       // Fall back to regular block placement disregarding favored nodes hint
       return chooseTarget(src, numOfReplicas, writer, 
           new ArrayList<DatanodeStorageInfo>(numOfReplicas), false, 
@@ -291,6 +289,9 @@ public class BlockPlacementPolicyDefault extends 
BlockPlacementPolicy {
             unavailableStorages, newBlock);
     final EnumMap<StorageType, Integer> storageTypes =
         getRequiredStorageTypes(requiredStorageTypes);
+    if (LOG.isTraceEnabled()) {
+      LOG.trace("storageTypes=" + storageTypes);
+    }
 
     try {
       if ((numOfReplicas = requiredStorageTypes.size()) == 0) {
@@ -337,7 +338,11 @@ public class BlockPlacementPolicyDefault extends 
BlockPlacementPolicy {
     } catch (NotEnoughReplicasException e) {
       final String message = "Failed to place enough replicas, still in need 
of "
           + (totalReplicasExpected - results.size()) + " to reach "
-          + totalReplicasExpected + ".";
+          + totalReplicasExpected
+          + " (unavailableStorages=" + unavailableStorages
+          + ", storagePolicy=" + storagePolicy
+          + ", newBlock=" + newBlock + ")";
+
       if (LOG.isTraceEnabled()) {
         LOG.trace(message, e);
       } else {
@@ -466,39 +471,59 @@ public class BlockPlacementPolicyDefault extends 
BlockPlacementPolicy {
       return chooseRandom(NodeBase.ROOT, excludedNodes, blocksize,
           maxNodesPerRack, results, avoidStaleNodes, storageTypes);
     }
+    final String localRack = localMachine.getNetworkLocation();
       
-    // choose one from the local rack
     try {
-      return chooseRandom(localMachine.getNetworkLocation(), excludedNodes,
+      // choose one from the local rack
+      return chooseRandom(localRack, excludedNodes,
           blocksize, maxNodesPerRack, results, avoidStaleNodes, storageTypes);
-    } catch (NotEnoughReplicasException e1) {
-      // find the second replica
-      DatanodeDescriptor newLocal=null;
+    } catch (NotEnoughReplicasException e) {
+      // find the next replica and retry with its rack
       for(DatanodeStorageInfo resultStorage : results) {
         DatanodeDescriptor nextNode = resultStorage.getDatanodeDescriptor();
         if (nextNode != localMachine) {
-          newLocal = nextNode;
-          break;
-        }
-      }
-      if (newLocal != null) {
-        try {
-          return chooseRandom(newLocal.getNetworkLocation(), excludedNodes,
-              blocksize, maxNodesPerRack, results, avoidStaleNodes,
-              storageTypes);
-        } catch(NotEnoughReplicasException e2) {
-          //otherwise randomly choose one from the network
-          return chooseRandom(NodeBase.ROOT, excludedNodes, blocksize,
+          if (LOG.isDebugEnabled()) {
+            LOG.debug("Failed to choose from local rack (location = " + 
localRack
+                + "), retry with the rack of the next replica (location = "
+                + nextNode.getNetworkLocation() + ")", e);
+          }
+          return chooseFromNextRack(nextNode, excludedNodes, blocksize,
               maxNodesPerRack, results, avoidStaleNodes, storageTypes);
         }
-      } else {
-        //otherwise randomly choose one from the network
-        return chooseRandom(NodeBase.ROOT, excludedNodes, blocksize,
-            maxNodesPerRack, results, avoidStaleNodes, storageTypes);
       }
+
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("Failed to choose from local rack (location = " + localRack
+            + "); the second replica is not found, retry choosing ramdomly", 
e);
+      }
+      //the second replica is not found, randomly choose one from the network
+      return chooseRandom(NodeBase.ROOT, excludedNodes, blocksize,
+          maxNodesPerRack, results, avoidStaleNodes, storageTypes);
     }
   }
-    
+
+  private DatanodeStorageInfo chooseFromNextRack(Node next,
+      Set<Node> excludedNodes,
+      long blocksize,
+      int maxNodesPerRack,
+      List<DatanodeStorageInfo> results,
+      boolean avoidStaleNodes,
+      EnumMap<StorageType, Integer> storageTypes) throws 
NotEnoughReplicasException {
+    final String nextRack = next.getNetworkLocation();
+    try {
+      return chooseRandom(nextRack, excludedNodes, blocksize, maxNodesPerRack,
+          results, avoidStaleNodes, storageTypes);
+    } catch(NotEnoughReplicasException e) {
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("Failed to choose from the next rack (location = " + nextRack
+            + "), retry choosing ramdomly", e);
+      }
+      //otherwise randomly choose one from the network
+      return chooseRandom(NodeBase.ROOT, excludedNodes, blocksize,
+          maxNodesPerRack, results, avoidStaleNodes, storageTypes);
+    }
+  }
+
   /** 
    * Choose <i>numOfReplicas</i> nodes from the racks 
    * that <i>localMachine</i> is NOT on.
@@ -522,6 +547,10 @@ public class BlockPlacementPolicyDefault extends 
BlockPlacementPolicy {
           excludedNodes, blocksize, maxReplicasPerRack, results,
           avoidStaleNodes, storageTypes);
     } catch (NotEnoughReplicasException e) {
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("Failed to choose remote rack (location = ~"
+            + localMachine.getNetworkLocation() + "), fallback to local rack", 
e);
+      }
       chooseRandom(numOfReplicas-(results.size()-oldNumOfReplicas),
                    localMachine.getNetworkLocation(), excludedNodes, 
blocksize, 
                    maxReplicasPerRack, results, avoidStaleNodes, storageTypes);
@@ -572,6 +601,9 @@ public class BlockPlacementPolicyDefault extends 
BlockPlacementPolicy {
       DatanodeDescriptor chosenNode = 
           (DatanodeDescriptor)clusterMap.chooseRandom(scope);
       if (excludedNodes.add(chosenNode)) { //was not in the excluded list
+        if (LOG.isDebugEnabled()) {
+          builder.append("\nNode 
").append(NodeBase.getPath(chosenNode)).append(" [");
+        }
         numOfAvailableNodes--;
 
         final DatanodeStorageInfo[] storages = DFSUtil.shuffle(
@@ -603,6 +635,9 @@ public class BlockPlacementPolicyDefault extends 
BlockPlacementPolicy {
             }
           }
         }
+        if (LOG.isDebugEnabled()) {
+          builder.append("\n]");
+        }
 
         // If no candidate storage was found on this DN then set badTarget.
         badTarget = (i == storages.length);
@@ -613,9 +648,11 @@ public class BlockPlacementPolicyDefault extends 
BlockPlacementPolicy {
       String detail = enableDebugLogging;
       if (LOG.isDebugEnabled()) {
         if (badTarget && builder != null) {
-          detail = builder.append("]").toString();
+          detail = builder.toString();
           builder.setLength(0);
-        } else detail = "";
+        } else {
+          detail = "";
+        }
       }
       throw new NotEnoughReplicasException(detail);
     }
@@ -649,14 +686,10 @@ public class BlockPlacementPolicyDefault extends 
BlockPlacementPolicy {
 
   private static void logNodeIsNotChosen(DatanodeStorageInfo storage, String 
reason) {
     if (LOG.isDebugEnabled()) {
-      final DatanodeDescriptor node = storage.getDatanodeDescriptor();
       // build the error message for later use.
       debugLoggingBuilder.get()
-          .append(node).append(": ")
-          .append("Storage ").append(storage)
-          .append("at node ").append(NodeBase.getPath(node))
-          .append(" is not chosen because ")
-          .append(reason);
+          .append("\n  Storage ").append(storage)
+          .append(" is not chosen since ").append(reason).append(".");
     }
   }
 
@@ -681,11 +714,10 @@ public class BlockPlacementPolicyDefault extends 
BlockPlacementPolicy {
                                boolean considerLoad,
                                List<DatanodeStorageInfo> results,
                                boolean avoidStaleNodes,
-                               StorageType storageType) {
-    if (storage.getStorageType() != storageType) {
-      logNodeIsNotChosen(storage,
-          "storage types do not match, where the expected storage type is "
-              + storageType);
+                               StorageType requiredStorageType) {
+    if (storage.getStorageType() != requiredStorageType) {
+      logNodeIsNotChosen(storage, "storage types do not match,"
+          + " where the required storage type is " + requiredStorageType);
       return false;
     }
     if (storage.getState() == State.READ_ONLY_SHARED) {
@@ -707,9 +739,14 @@ public class BlockPlacementPolicyDefault extends 
BlockPlacementPolicy {
     }
     
     final long requiredSize = blockSize * HdfsConstants.MIN_BLOCKS_FOR_WRITE;
-    final long scheduledSize = blockSize * node.getBlocksScheduled();
-    if (requiredSize > node.getRemaining() - scheduledSize) {
-      logNodeIsNotChosen(storage, "the node does not have enough space ");
+    final long scheduledSize = blockSize * 
node.getBlocksScheduled(storage.getStorageType());
+    final long remaining = node.getRemaining(storage.getStorageType());
+    if (requiredSize > remaining - scheduledSize) {
+      logNodeIsNotChosen(storage, "the node does not have enough "
+          + storage.getStorageType() + " space"
+          + " (required=" + requiredSize
+          + ", scheduled=" + scheduledSize
+          + ", remaining=" + remaining + ")");
       return false;
     }
 
@@ -718,8 +755,8 @@ public class BlockPlacementPolicyDefault extends 
BlockPlacementPolicy {
       final double maxLoad = 2.0 * stats.getInServiceXceiverAverage();
       final int nodeLoad = node.getXceiverCount();
       if (nodeLoad > maxLoad) {
-        logNodeIsNotChosen(storage,
-            "the node is too busy (load:"+nodeLoad+" > "+maxLoad+") ");
+        logNodeIsNotChosen(storage, "the node is too busy (load: " + nodeLoad
+            + " > " + maxLoad + ") ");
         return false;
       }
     }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/22a41dce/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeDescriptor.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeDescriptor.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeDescriptor.java
index 34be727..55599f7 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeDescriptor.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeDescriptor.java
@@ -28,16 +28,19 @@ import java.util.Map;
 import java.util.Queue;
 
 import com.google.common.annotations.VisibleForTesting;
+
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.hdfs.StorageType;
 import org.apache.hadoop.hdfs.protocol.Block;
 import org.apache.hadoop.hdfs.protocol.DatanodeID;
 import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
 import org.apache.hadoop.hdfs.server.namenode.CachedBlock;
 import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage;
 import org.apache.hadoop.hdfs.server.protocol.StorageReport;
+import org.apache.hadoop.hdfs.util.EnumCounters;
 import org.apache.hadoop.hdfs.util.LightWeightHashSet;
 import org.apache.hadoop.util.IntrusiveCollection;
 import org.apache.hadoop.util.Time;
@@ -202,8 +205,10 @@ public class DatanodeDescriptor extends DatanodeInfo {
    * in case of errors (e.g. datanode does not report if an error occurs
    * while writing the block).
    */
-  private int currApproxBlocksScheduled = 0;
-  private int prevApproxBlocksScheduled = 0;
+  private EnumCounters<StorageType> currApproxBlocksScheduled
+      = new EnumCounters<StorageType>(StorageType.class);
+  private EnumCounters<StorageType> prevApproxBlocksScheduled
+      = new EnumCounters<StorageType>(StorageType.class);
   private long lastBlocksScheduledRollTime = 0;
   private static final int BLOCKS_SCHEDULED_ROLL_INTERVAL = 600*1000; //10min
   private int volumeFailures = 0;
@@ -476,23 +481,46 @@ public class DatanodeDescriptor extends DatanodeInfo {
 
   /**
    * @return Approximate number of blocks currently scheduled to be written 
+   */
+  public long getRemaining(StorageType t) {
+    long remaining = 0;
+    for(DatanodeStorageInfo s : getStorageInfos()) {
+      if (s.getStorageType() == t) {
+        remaining += s.getRemaining();
+      }
+    }
+    return remaining;    
+  }
+
+  /**
+   * @return Approximate number of blocks currently scheduled to be written 
+   * to the given storage type of this datanode.
+   */
+  public int getBlocksScheduled(StorageType t) {
+    return (int)(currApproxBlocksScheduled.get(t)
+        + prevApproxBlocksScheduled.get(t));
+  }
+
+  /**
+   * @return Approximate number of blocks currently scheduled to be written 
    * to this datanode.
    */
   public int getBlocksScheduled() {
-    return currApproxBlocksScheduled + prevApproxBlocksScheduled;
+    return (int)(currApproxBlocksScheduled.sum()
+        + prevApproxBlocksScheduled.sum());
   }
 
   /** Increment the number of blocks scheduled. */
-  void incrementBlocksScheduled() {
-    currApproxBlocksScheduled++;
+  void incrementBlocksScheduled(StorageType t) {
+    currApproxBlocksScheduled.add(t, 1);;
   }
   
   /** Decrement the number of blocks scheduled. */
-  void decrementBlocksScheduled() {
-    if (prevApproxBlocksScheduled > 0) {
-      prevApproxBlocksScheduled--;
-    } else if (currApproxBlocksScheduled > 0) {
-      currApproxBlocksScheduled--;
+  void decrementBlocksScheduled(StorageType t) {
+    if (prevApproxBlocksScheduled.get(t) > 0) {
+      prevApproxBlocksScheduled.subtract(t, 1);
+    } else if (currApproxBlocksScheduled.get(t) > 0) {
+      currApproxBlocksScheduled.subtract(t, 1);
     } 
     // its ok if both counters are zero.
   }
@@ -500,8 +528,8 @@ public class DatanodeDescriptor extends DatanodeInfo {
   /** Adjusts curr and prev number of blocks scheduled every few minutes. */
   private void rollBlocksScheduled(long now) {
     if (now - lastBlocksScheduledRollTime > BLOCKS_SCHEDULED_ROLL_INTERVAL) {
-      prevApproxBlocksScheduled = currApproxBlocksScheduled;
-      currApproxBlocksScheduled = 0;
+      prevApproxBlocksScheduled.set(currApproxBlocksScheduled);
+      currApproxBlocksScheduled.reset();
       lastBlocksScheduledRollTime = now;
     }
   }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/22a41dce/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeStorageInfo.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeStorageInfo.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeStorageInfo.java
index 9f07a38..04f17ac 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeStorageInfo.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeStorageInfo.java
@@ -283,7 +283,7 @@ public class DatanodeStorageInfo {
   /** Increment the number of blocks scheduled for each given storage */ 
   public static void incrementBlocksScheduled(DatanodeStorageInfo... storages) 
{
     for (DatanodeStorageInfo s : storages) {
-      s.getDatanodeDescriptor().incrementBlocksScheduled();
+      s.getDatanodeDescriptor().incrementBlocksScheduled(s.getStorageType());
     }
   }
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/22a41dce/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/mover/Mover.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/mover/Mover.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/mover/Mover.java
index 2bb1317..57ad6aa 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/mover/Mover.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/mover/Mover.java
@@ -18,6 +18,7 @@
 package org.apache.hadoop.hdfs.server.mover;
 
 import com.google.common.annotations.VisibleForTesting;
+
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.classification.InterfaceAudience;
@@ -432,6 +433,12 @@ public class Mover {
       }
       return expected.isEmpty() || existing.isEmpty();
     }
+    
+    @Override
+    public String toString() {
+      return getClass().getSimpleName() + "{expected=" + expected
+          + ", existing=" + existing + "}";
+    }
   }
 
   static int run(Collection<URI> namenodes, Configuration conf)

http://git-wip-us.apache.org/repos/asf/hadoop/blob/22a41dce/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/util/EnumCounters.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/util/EnumCounters.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/util/EnumCounters.java
index 8bdea1f..8a8e61f 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/util/EnumCounters.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/util/EnumCounters.java
@@ -105,6 +105,15 @@ public class EnumCounters<E extends Enum<E>> {
       this.counters[i] -= that.counters[i];
     }
   }
+  
+  /** @return the sum of all counters. */
+  public final long sum() {
+    long sum = 0;
+    for(int i = 0; i < counters.length; i++) {
+      sum += counters[i];
+    }
+    return sum;
+  }
 
   @Override
   public boolean equals(Object obj) {

http://git-wip-us.apache.org/repos/asf/hadoop/blob/22a41dce/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/MiniDFSCluster.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/MiniDFSCluster.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/MiniDFSCluster.java
index 0e49cfe..0512b7f 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/MiniDFSCluster.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/MiniDFSCluster.java
@@ -1474,19 +1474,21 @@ public class MiniDFSCluster {
           secureResources, dn.getIpcPort()));
       dns[i - curDatanodesNum] = dn;
     }
-    curDatanodesNum += numDataNodes;
     this.numDataNodes += numDataNodes;
     waitActive();
-
+    
     if (storageCapacities != null) {
       for (int i = curDatanodesNum; i < curDatanodesNum+numDataNodes; ++i) {
-        List<? extends FsVolumeSpi> volumes = 
dns[i].getFSDataset().getVolumes();
-        assert storageCapacities[i].length == storagesPerDatanode;
+        final int index = i - curDatanodesNum;
+        List<? extends FsVolumeSpi> volumes = 
dns[index].getFSDataset().getVolumes();
+        assert storageCapacities[index].length == storagesPerDatanode;
         assert volumes.size() == storagesPerDatanode;
 
         for (int j = 0; j < volumes.size(); ++j) {
           FsVolumeImpl volume = (FsVolumeImpl) volumes.get(j);
-          volume.setCapacityForTesting(storageCapacities[i][j]);
+          LOG.info("setCapacityForTesting "  + storageCapacities[index][j]
+              + " for [" + volume.getStorageType() + "]" + 
volume.getStorageID());
+          volume.setCapacityForTesting(storageCapacities[index][j]);
         }
       }
     }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/22a41dce/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/mover/TestStorageMover.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/mover/TestStorageMover.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/mover/TestStorageMover.java
index d2a7fcc..88b3992 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/mover/TestStorageMover.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/mover/TestStorageMover.java
@@ -17,30 +17,58 @@
  */
 package org.apache.hadoop.hdfs.server.mover;
 
-import com.google.common.base.Preconditions;
-import com.google.common.collect.Maps;
+import java.io.IOException;
+import java.net.URI;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.commons.logging.impl.Log4JLogger;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hdfs.*;
+import org.apache.hadoop.hdfs.BlockStoragePolicy;
+import org.apache.hadoop.hdfs.DFSConfigKeys;
+import org.apache.hadoop.hdfs.DFSTestUtil;
+import org.apache.hadoop.hdfs.DFSUtil;
+import org.apache.hadoop.hdfs.DistributedFileSystem;
+import org.apache.hadoop.hdfs.HdfsConfiguration;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hadoop.hdfs.StorageType;
 import org.apache.hadoop.hdfs.protocol.DirectoryListing;
+import org.apache.hadoop.hdfs.protocol.HdfsConstants;
 import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
 import org.apache.hadoop.hdfs.protocol.HdfsLocatedFileStatus;
 import org.apache.hadoop.hdfs.protocol.LocatedBlock;
 import org.apache.hadoop.hdfs.server.balancer.Dispatcher;
 import org.apache.hadoop.hdfs.server.balancer.ExitStatus;
+import org.apache.hadoop.hdfs.server.blockmanagement.BlockPlacementPolicy;
 import org.apache.hadoop.hdfs.server.namenode.snapshot.SnapshotTestHelper;
 import org.apache.hadoop.io.IOUtils;
+import org.apache.log4j.Level;
 import org.junit.Assert;
 import org.junit.Test;
 
-import java.net.URI;
-import java.util.*;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Maps;
 
 /**
  * Test the data migration tool (for Archival Storage)
  */
 public class TestStorageMover {
-  private static final long BLOCK_SIZE = 1024;
+  static final Log LOG = LogFactory.getLog(TestStorageMover.class);
+  static {
+    ((Log4JLogger)LogFactory.getLog(BlockPlacementPolicy.class)
+        ).getLogger().setLevel(Level.ALL);
+    ((Log4JLogger)LogFactory.getLog(Dispatcher.class)
+        ).getLogger().setLevel(Level.ALL);
+  }
+
+  private static final int BLOCK_SIZE = 1024;
   private static final short REPL = 3;
   private static final int NUM_DATANODES = 6;
   private static final Configuration DEFAULT_CONF = new HdfsConfiguration();
@@ -50,12 +78,15 @@ public class TestStorageMover {
   private static final BlockStoragePolicy COLD;
 
   static {
-    DEFAULT_POLICIES = BlockStoragePolicy.readBlockStorageSuite(new
-        HdfsConfiguration());
+    DEFAULT_CONF.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, BLOCK_SIZE);
+    DEFAULT_CONF.setLong(DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_KEY, 1L);
+    DEFAULT_CONF.setLong(DFSConfigKeys.DFS_MOVER_MOVEDWINWIDTH_KEY, 2000L);
+
+    DEFAULT_POLICIES = BlockStoragePolicy.readBlockStorageSuite(DEFAULT_CONF);
     HOT = DEFAULT_POLICIES.getPolicy("HOT");
     WARM = DEFAULT_POLICIES.getPolicy("WARM");
     COLD = DEFAULT_POLICIES.getPolicy("COLD");
-    Dispatcher.setBlockMoveWaitTime(10 * 1000);
+    Dispatcher.setBlockMoveWaitTime(1000L);
   }
 
   /**
@@ -63,17 +94,48 @@ public class TestStorageMover {
    * also defines snapshots.
    */
   static class NamespaceScheme {
+    final List<Path> dirs;
     final List<Path> files;
+    final long fileSize;
     final Map<Path, List<String>> snapshotMap;
     final Map<Path, BlockStoragePolicy> policyMap;
 
-    NamespaceScheme(List<Path> files, Map<Path,List<String>> snapshotMap,
+    NamespaceScheme(List<Path> dirs, List<Path> files, long fileSize, 
+                    Map<Path,List<String>> snapshotMap,
                     Map<Path, BlockStoragePolicy> policyMap) {
-      this.files = files;
+      this.dirs = dirs == null? Collections.<Path>emptyList(): dirs;
+      this.files = files == null? Collections.<Path>emptyList(): files;
+      this.fileSize = fileSize;
       this.snapshotMap = snapshotMap == null ?
-          new HashMap<Path, List<String>>() : snapshotMap;
+          Collections.<Path, List<String>>emptyMap() : snapshotMap;
       this.policyMap = policyMap;
     }
+
+    /**
+     * Create files/directories/snapshots.
+     */
+    void prepare(DistributedFileSystem dfs, short repl) throws Exception {
+      for (Path d : dirs) {
+        dfs.mkdirs(d);
+      }
+      for (Path file : files) {
+        DFSTestUtil.createFile(dfs, file, fileSize, repl, 0L);
+      }
+      for (Map.Entry<Path, List<String>> entry : snapshotMap.entrySet()) {
+        for (String snapshot : entry.getValue()) {
+          SnapshotTestHelper.createSnapshot(dfs, entry.getKey(), snapshot);
+        }
+      }
+    }
+
+    /**
+     * Set storage policies according to the corresponding scheme.
+     */
+    void setStoragePolicy(DistributedFileSystem dfs) throws Exception {
+      for (Map.Entry<Path, BlockStoragePolicy> entry : policyMap.entrySet()) {
+        dfs.setStoragePolicy(entry.getKey(), entry.getValue().getName());
+      }
+    }
   }
 
   /**
@@ -87,6 +149,11 @@ public class TestStorageMover {
     final StorageType[][] storageTypes;
     final long[][] storageCapacities;
 
+    ClusterScheme() {
+      this(DEFAULT_CONF, NUM_DATANODES, REPL,
+          genStorageTypes(NUM_DATANODES, 1, 1), null);
+    }
+
     ClusterScheme(Configuration conf, int numDataNodes, short repl,
         StorageType[][] types, long[][] capacities) {
       Preconditions.checkArgument(types == null || types.length == 
numDataNodes);
@@ -128,6 +195,22 @@ public class TestStorageMover {
       dfs = cluster.getFileSystem();
     }
 
+    private void runBasicTest(boolean shotdown) throws Exception {
+      setupCluster();
+      try {
+        prepareNamespace();
+        verify(true);
+
+        setStoragePolicy();
+        migrate();
+        verify(true);
+      } finally {
+        if (shotdown) {
+          shutdownCluster();
+        }
+      }
+    }
+
     void shutdownCluster() throws Exception {
       IOUtils.cleanup(null, dfs);
       if (cluster != null) {
@@ -140,18 +223,11 @@ public class TestStorageMover {
      * corresponding scheme.
      */
     void prepareNamespace() throws Exception {
-      for (Path file : nsScheme.files) {
-        DFSTestUtil.createFile(dfs, file, BLOCK_SIZE * 2, clusterScheme.repl,
-            0L);
-      }
-      for (Map.Entry<Path, List<String>> entry : 
nsScheme.snapshotMap.entrySet()) {
-        for (String snapshot : entry.getValue()) {
-          SnapshotTestHelper.createSnapshot(dfs, entry.getKey(), snapshot);
-        }
-      }
-      for (Map.Entry<Path, BlockStoragePolicy> entry : 
nsScheme.policyMap.entrySet()) {
-        dfs.setStoragePolicy(entry.getKey(), entry.getValue().getName());
-      }
+      nsScheme.prepare(dfs, clusterScheme.repl);
+    }
+
+    void setStoragePolicy() throws Exception {
+      nsScheme.setStoragePolicy(dfs);
     }
 
     /**
@@ -159,6 +235,7 @@ public class TestStorageMover {
      */
     void migrate(String... args) throws Exception {
       runMover();
+      Thread.sleep(5000); // let the NN finish deletion
     }
 
     /**
@@ -195,38 +272,128 @@ public class TestStorageMover {
           verifyRecursively(fullPath, child);
         }
       } else if (!status.isSymlink()) { // is file
-        HdfsLocatedFileStatus fileStatus = (HdfsLocatedFileStatus) status;
-        byte policyId = fileStatus.getStoragePolicy();
-        BlockStoragePolicy policy = policies.getPolicy(policyId);
-        final List<StorageType> types = policy.chooseStorageTypes(
-            status.getReplication());
-        for(LocatedBlock lb : 
fileStatus.getBlockLocations().getLocatedBlocks()) {
-          final Mover.StorageTypeDiff diff = new Mover.StorageTypeDiff(types,
-              lb.getStorageTypes());
-          Assert.assertTrue(diff.removeOverlap());
+        verifyFile(parent, status, null);
+      }
+    }
+
+    private void verifyFile(final Path parent, final HdfsFileStatus status,
+        final Byte expectedPolicyId) throws Exception {
+      HdfsLocatedFileStatus fileStatus = (HdfsLocatedFileStatus) status;
+      byte policyId = fileStatus.getStoragePolicy();
+      BlockStoragePolicy policy = policies.getPolicy(policyId);
+      if (expectedPolicyId != null) {
+        Assert.assertEquals(expectedPolicyId, policy);
+      }
+      final List<StorageType> types = policy.chooseStorageTypes(
+          status.getReplication());
+      for(LocatedBlock lb : fileStatus.getBlockLocations().getLocatedBlocks()) 
{
+        final Mover.StorageTypeDiff diff = new Mover.StorageTypeDiff(types,
+            lb.getStorageTypes());
+        Assert.assertTrue(fileStatus.getFullName(parent.toString())
+            + " with policy " + policy + " has non-empty overlap: " + diff,
+            diff.removeOverlap());
+      }
+    }
+    
+    Replication getReplication(Path file) throws IOException {
+      return getOrVerifyReplication(file, null);
+    }
+
+    Replication verifyReplication(Path file, int expectedDiskCount,
+        int expectedArchiveCount) throws IOException {
+      final Replication r = new Replication();
+      r.disk = expectedDiskCount;
+      r.archive = expectedArchiveCount;
+      return getOrVerifyReplication(file, r);
+    }
+
+    private Replication getOrVerifyReplication(Path file, Replication expected)
+        throws IOException {
+      final List<LocatedBlock> lbs = dfs.getClient().getLocatedBlocks(
+          file.toString(), 0).getLocatedBlocks();
+      Assert.assertEquals(1, lbs.size());
+
+      LocatedBlock lb = lbs.get(0);
+      StringBuilder types = new StringBuilder(); 
+      final Replication r = new Replication();
+      for(StorageType t : lb.getStorageTypes()) {
+        types.append(t).append(", ");
+        if (t == StorageType.DISK) {
+          r.disk++;
+        } else if (t == StorageType.ARCHIVE) {
+          r.archive++;
+        } else {
+          Assert.fail("Unexpected storage type " + t);
         }
       }
+
+      if (expected != null) {
+        final String s = "file = " + file + "\n  types = [" + types + "]";
+        Assert.assertEquals(s, expected, r);
+      }
+      return r;
     }
   }
 
+  static class Replication {
+    int disk;
+    int archive;
+    
+    @Override
+    public int hashCode() {
+      return disk ^ archive;
+    }
+
+    @Override
+    public boolean equals(Object obj) {
+      if (obj == this) {
+        return true;
+      } else if (obj == null || !(obj instanceof Replication)) {
+        return false;
+      }
+      final Replication that = (Replication)obj;
+      return this.disk == that.disk && this.archive == that.archive;
+    }
+    
+    @Override
+    public String toString() {
+      return "[disk=" + disk + ", archive=" + archive + "]";
+    }
+  }
   private static StorageType[][] genStorageTypes(int numDataNodes) {
+    return genStorageTypes(numDataNodes, 0, 0);
+  }
+
+  private static StorageType[][] genStorageTypes(int numDataNodes,
+      int numAllDisk, int numAllArchive) {
     StorageType[][] types = new StorageType[numDataNodes][];
-    for (int i = 0; i < types.length; i++) {
+    int i = 0;
+    for (; i < numAllDisk; i++) {
+      types[i] = new StorageType[]{StorageType.DISK, StorageType.DISK};
+    }
+    for (; i < numAllDisk + numAllArchive; i++) {
+      types[i] = new StorageType[]{StorageType.ARCHIVE, StorageType.ARCHIVE};
+    }
+    for (; i < types.length; i++) {
       types[i] = new StorageType[]{StorageType.DISK, StorageType.ARCHIVE};
     }
     return types;
   }
-
-  private void runTest(MigrationTest test) throws Exception {
-    test.setupCluster();
-    try {
-      test.prepareNamespace();
-      test.migrate();
-      Thread.sleep(5000); // let the NN finish deletion
-      test.verify(true);
-    } finally {
-      test.shutdownCluster();
+  
+  private static long[][] genCapacities(int nDatanodes, int numAllDisk,
+      int numAllArchive, long diskCapacity, long archiveCapacity) {
+    final long[][] capacities = new long[nDatanodes][];
+    int i = 0;
+    for (; i < numAllDisk; i++) {
+      capacities[i] = new long[]{diskCapacity, diskCapacity};
     }
+    for (; i < numAllDisk + numAllArchive; i++) {
+      capacities[i] = new long[]{archiveCapacity, archiveCapacity};
+    }
+    for(; i < capacities.length; i++) {
+      capacities[i] = new long[]{diskCapacity, archiveCapacity};
+    }
+    return capacities;
   }
 
   /**
@@ -237,11 +404,227 @@ public class TestStorageMover {
     final Path foo = new Path("/foo");
     Map<Path, BlockStoragePolicy> policyMap = Maps.newHashMap();
     policyMap.put(foo, COLD);
-    NamespaceScheme nsScheme = new NamespaceScheme(Arrays.asList(foo), null,
-        policyMap);
+    NamespaceScheme nsScheme = new NamespaceScheme(null, Arrays.asList(foo),
+        2*BLOCK_SIZE, null, policyMap);
     ClusterScheme clusterScheme = new ClusterScheme(DEFAULT_CONF,
         NUM_DATANODES, REPL, genStorageTypes(NUM_DATANODES), null);
+    new MigrationTest(clusterScheme, nsScheme).runBasicTest(true);
+  }
+
+  private static class PathPolicyMap {
+    final Map<Path, BlockStoragePolicy> map = Maps.newHashMap();
+    final Path hot = new Path("/hot");
+    final Path warm = new Path("/warm");
+    final Path cold = new Path("/cold");
+    final List<Path> files;
+
+    PathPolicyMap(int filesPerDir){
+      map.put(hot, HOT);
+      map.put(warm, WARM);
+      map.put(cold, COLD);
+      files = new ArrayList<Path>();
+      for(Path dir : map.keySet()) {
+        for(int i = 0; i < filesPerDir; i++) {
+          files.add(new Path(dir, "file" + i));
+        }
+      }
+    }
+    
+    NamespaceScheme newNamespaceScheme() {
+      return new NamespaceScheme(Arrays.asList(hot, warm, cold),
+          files, BLOCK_SIZE/2, null, map);
+    }
+    
+    /** 
+     * Move hot files to warm and cold, warm files to hot and cold,
+     * and cold files to hot and warm.
+     */
+    void moveAround(DistributedFileSystem dfs) throws Exception {
+      for(Path srcDir : map.keySet()) {
+        int i = 0;
+        for(Path dstDir : map.keySet()) {
+          if (!srcDir.equals(dstDir)) {
+            final Path src = new Path(srcDir, "file" + i++);
+            final Path dst = new Path(dstDir, srcDir.getName() + "2" + 
dstDir.getName());
+            LOG.info("rename " + src + " to " + dst);
+            dfs.rename(src, dst);
+          }
+        }
+      }
+    }
+  }
+
+  /**
+   * Test directories with Hot, Warm and Cold polices.
+   */
+  @Test
+  public void testHotWarmColdDirs() throws Exception {
+    PathPolicyMap pathPolicyMap = new PathPolicyMap(3);
+    NamespaceScheme nsScheme = pathPolicyMap.newNamespaceScheme();
+    ClusterScheme clusterScheme = new ClusterScheme();
     MigrationTest test = new MigrationTest(clusterScheme, nsScheme);
-    runTest(test);
+
+    test.runBasicTest(false);
+
+    pathPolicyMap.moveAround(test.dfs);
+    test.migrate();
+    test.verify(true);
+    test.shutdownCluster();
+  }
+
+  /**
+   * Test DISK is running out of spaces.
+   */
+  @Test
+  public void testNoSpaceDisk() throws Exception {
+    final PathPolicyMap pathPolicyMap = new PathPolicyMap(0);
+    final NamespaceScheme nsScheme = pathPolicyMap.newNamespaceScheme();
+
+    final long diskCapacity = (3 + 
HdfsConstants.MIN_BLOCKS_FOR_WRITE)*BLOCK_SIZE;
+    final long archiveCapacity = 100*BLOCK_SIZE;
+    final long[][] capacities = genCapacities(NUM_DATANODES, 1, 1,
+        diskCapacity, archiveCapacity);
+    final ClusterScheme clusterScheme = new ClusterScheme(DEFAULT_CONF,
+        NUM_DATANODES, REPL, genStorageTypes(NUM_DATANODES, 1, 1), capacities);
+    final MigrationTest test = new MigrationTest(clusterScheme, nsScheme);
+
+    test.runBasicTest(false);
+
+    // create hot files with replication 3 until not more spaces.
+    final short replication = 3;
+    {
+      int hotFileCount = 0;
+      try {
+        for(; ; hotFileCount++) {
+          final Path p = new Path(pathPolicyMap.hot, "file" + hotFileCount);
+          DFSTestUtil.createFile(test.dfs, p, BLOCK_SIZE, replication, 0L);
+        }
+      } catch(IOException e) {
+        LOG.info("Expected: hotFileCount=" + hotFileCount, e);
+      }
+      Assert.assertTrue(hotFileCount >= 2);
+    }
+
+    // create hot files with replication 1 to use up all remaining spaces.
+    {
+      int hotFileCount_r1 = 0;
+      try {
+        for(; ; hotFileCount_r1++) {
+          final Path p = new Path(pathPolicyMap.hot, "file_r1_" + 
hotFileCount_r1);
+          DFSTestUtil.createFile(test.dfs, p, BLOCK_SIZE, (short)1, 0L);
+        }
+      } catch(IOException e) {
+        LOG.info("Expected: hotFileCount_r1=" + hotFileCount_r1, e);
+      }
+    }
+
+    { // test increasing replication.  Since DISK is full,
+      // new replicas should be stored in ARCHIVE as a fallback storage.
+      final Path file0 = new Path(pathPolicyMap.hot, "file0");
+      final Replication r = test.getReplication(file0);
+      LOG.info("XXX " + file0 + ": replication=" + r);
+      final short newReplication = (short)5;
+      test.dfs.setReplication(file0, newReplication);
+//      DFSTestUtil.waitReplication(test.dfs, file0, newReplication);
+      Thread.sleep(10000);
+      test.verifyReplication(file0, r.disk, newReplication - r.disk);
+    }
+
+    { // test creating a cold file and then increase replication
+      final Path p = new Path(pathPolicyMap.cold, "foo");
+      DFSTestUtil.createFile(test.dfs, p, BLOCK_SIZE, replication, 0L);
+      test.verifyReplication(p, 0, replication);
+
+      final short newReplication = 5;
+      test.dfs.setReplication(p, newReplication);
+//      DFSTestUtil.waitReplication(test.dfs, p, newReplication);
+      Thread.sleep(10000);
+      test.verifyReplication(p, 0, newReplication);
+    }
+
+    { //test move a hot file to warm
+      //TODO: fix Mover not terminate in the test below
+//      final Path file1 = new Path(pathPolicyMap.hot, "file1");
+//      test.dfs.rename(file1, pathPolicyMap.warm);
+//      test.migrate();
+//      test.verify(true);
+    }
+
+    test.shutdownCluster();
+  }
+
+  /**
+   * Test ARCHIVE is running out of spaces.
+   */
+  @Test
+  public void testNoSpaceArchive() throws Exception {
+    final PathPolicyMap pathPolicyMap = new PathPolicyMap(0);
+    final NamespaceScheme nsScheme = pathPolicyMap.newNamespaceScheme();
+
+    final long diskCapacity = 100*BLOCK_SIZE;
+    final long archiveCapacity = (2 + 
HdfsConstants.MIN_BLOCKS_FOR_WRITE)*BLOCK_SIZE;
+    final long[][] capacities = genCapacities(NUM_DATANODES, 1, 1,
+        diskCapacity, archiveCapacity);
+    final ClusterScheme clusterScheme = new ClusterScheme(DEFAULT_CONF,
+        NUM_DATANODES, REPL, genStorageTypes(NUM_DATANODES, 1, 1), capacities);
+    final MigrationTest test = new MigrationTest(clusterScheme, nsScheme);
+
+    test.runBasicTest(false);
+
+    // create cold files with replication 3 until not more spaces.
+    final short replication = 3;
+    {
+      int coldFileCount = 0;
+      try {
+        for(; ; coldFileCount++) {
+          final Path p = new Path(pathPolicyMap.cold, "file" + coldFileCount);
+          DFSTestUtil.createFile(test.dfs, p, BLOCK_SIZE, replication, 0L);
+        }
+      } catch(IOException e) {
+        LOG.info("Expected: coldFileCount=" + coldFileCount, e);
+      }
+      Assert.assertTrue(coldFileCount >= 2);
+    }
+
+    // create cold files with replication 1 to use up all remaining spaces.
+    {
+      int coldFileCount_r1 = 0;
+      try {
+        for(; ; coldFileCount_r1++) {
+          final Path p = new Path(pathPolicyMap.cold, "file_r1_" + 
coldFileCount_r1);
+          DFSTestUtil.createFile(test.dfs, p, BLOCK_SIZE, (short)1, 0L);
+        }
+      } catch(IOException e) {
+        LOG.info("Expected: coldFileCount_r1=" + coldFileCount_r1, e);
+      }
+    }
+
+    { // test increasing replication but new replicas cannot be created
+      // since no more ARCHIVE space.
+      final Path file0 = new Path(pathPolicyMap.cold, "file0");
+      final Replication r = test.getReplication(file0);
+      LOG.info("XXX " + file0 + ": replication=" + r);
+      Assert.assertEquals(0, r.disk);
+
+      final short newReplication = (short)5;
+      test.dfs.setReplication(file0, newReplication);
+      Thread.sleep(10000);
+
+      test.verifyReplication(file0, 0, r.archive);
+    }
+
+    { // test creating a hot file
+      final Path p = new Path(pathPolicyMap.hot, "foo");
+      DFSTestUtil.createFile(test.dfs, p, BLOCK_SIZE, (short)3, 0L);
+    }
+
+    { //test move a cold file to warm
+      final Path file1 = new Path(pathPolicyMap.hot, "file1");
+      test.dfs.rename(file1, pathPolicyMap.warm);
+      test.migrate();
+      test.verify(true);
+    }
+
+    test.shutdownCluster();
   }
 }

Reply via email to