HDFS-7034. Archival Storage: Fix TestBlockPlacement and TestStorageMover. Contributed by Jing Zhao.
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/0d85f7e5 Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/0d85f7e5 Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/0d85f7e5 Branch: refs/heads/HDFS-6581 Commit: 0d85f7e59146cc3e9a040c2203995f3efd8ed4eb Parents: 70dfe9c Author: Jing Zhao <ji...@apache.org> Authored: Thu Sep 11 13:00:43 2014 -0700 Committer: Jing Zhao <ji...@apache.org> Committed: Thu Sep 11 13:00:43 2014 -0700 ---------------------------------------------------------------------- .../org/apache/hadoop/hdfs/DFSConfigKeys.java | 2 +- .../hadoop/hdfs/server/balancer/Dispatcher.java | 27 +- .../hdfs/server/balancer/NameNodeConnector.java | 20 +- .../server/blockmanagement/BlockManager.java | 4 +- .../hadoop/hdfs/server/datanode/DataNode.java | 4 +- .../apache/hadoop/hdfs/server/mover/Mover.java | 10 +- .../hadoop/hdfs/TestBlockStoragePolicy.java | 8 +- .../hdfs/server/mover/TestStorageMover.java | 286 ++++++++++--------- 8 files changed, 208 insertions(+), 153 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hadoop/blob/0d85f7e5/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java index f3edbf5..641fb52 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java @@ -372,7 +372,7 @@ public class DFSConfigKeys extends CommonConfigurationKeys { public static final int DFS_BALANCER_MOVERTHREADS_DEFAULT = 1000; public static final String DFS_BALANCER_DISPATCHERTHREADS_KEY = "dfs.balancer.dispatcherThreads"; public static final int DFS_BALANCER_DISPATCHERTHREADS_DEFAULT = 200; - + public static final String DFS_MOVER_MOVEDWINWIDTH_KEY = "dfs.mover.movedWinWidth"; public static final long DFS_MOVER_MOVEDWINWIDTH_DEFAULT = 5400*1000L; public static final String DFS_MOVER_MOVERTHREADS_KEY = "dfs.mover.moverThreads"; http://git-wip-us.apache.org/repos/asf/hadoop/blob/0d85f7e5/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Dispatcher.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Dispatcher.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Dispatcher.java index 98bd58e..f2a1299 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Dispatcher.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Dispatcher.java @@ -49,6 +49,7 @@ import org.apache.commons.logging.LogFactory; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.CommonConfigurationKeys; +import org.apache.hadoop.hdfs.DFSConfigKeys; import org.apache.hadoop.hdfs.DFSUtil; import org.apache.hadoop.hdfs.DistributedFileSystem; import org.apache.hadoop.hdfs.StorageType; @@ -88,7 +89,11 @@ public class Dispatcher { private static final long MAX_BLOCKS_SIZE_TO_FETCH = 2 * GB; private static final int MAX_NO_PENDING_MOVE_ITERATIONS = 5; - private static final long DELAY_AFTER_ERROR = 10 * 1000L; // 10 seconds + /** + * the period of time to delay the usage of a DataNode after hitting + * errors when using it for migrating data + */ + private static long delayAfterErrors = 10 * 1000; private final NameNodeConnector nnc; private final SaslDataTransferClient saslClient; @@ -112,6 +117,7 @@ public class Dispatcher { private final ExecutorService moveExecutor; private final ExecutorService dispatchExecutor; + /** The maximum number of concurrent blocks moves at a datanode */ private final int maxConcurrentMovesPerNode; @@ -187,10 +193,12 @@ public class Dispatcher { @Override public String toString() { - final Block b = block.getBlock(); - return b + " with size=" + b.getNumBytes() + " from " - + source.getDisplayName() + " to " + target.getDisplayName() - + " through " + proxySource.datanode; + final Block b = block != null ? block.getBlock() : null; + String bStr = b != null ? (b + " with size=" + b.getNumBytes() + " ") + : " "; + return bStr + "from " + source.getDisplayName() + " to " + target + .getDisplayName() + " through " + (proxySource != null ? proxySource + .datanode : ""); } /** @@ -316,8 +324,8 @@ public class Dispatcher { // further in order to avoid a potential storm of "threads quota // exceeded" warnings when the dispatcher gets out of sync with work // going on in datanodes. - proxySource.activateDelay(DELAY_AFTER_ERROR); - target.getDDatanode().activateDelay(DELAY_AFTER_ERROR); + proxySource.activateDelay(delayAfterErrors); + target.getDDatanode().activateDelay(delayAfterErrors); } finally { IOUtils.closeStream(out); IOUtils.closeStream(in); @@ -1043,6 +1051,11 @@ public class Dispatcher { blockMoveWaitTime = time; } + @VisibleForTesting + public static void setDelayAfterErrors(long time) { + delayAfterErrors = time; + } + /** shutdown thread pools */ public void shutdownNow() { if (dispatchExecutor != null) { http://git-wip-us.apache.org/repos/asf/hadoop/blob/0d85f7e5/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/NameNodeConnector.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/NameNodeConnector.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/NameNodeConnector.java index 79815c0..9e08d51 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/NameNodeConnector.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/NameNodeConnector.java @@ -25,13 +25,16 @@ import java.net.InetAddress; import java.net.URI; import java.util.*; +import com.google.common.annotations.VisibleForTesting; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.FsServerDefaults; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hdfs.DFSConfigKeys; import org.apache.hadoop.hdfs.DistributedFileSystem; import org.apache.hadoop.hdfs.NameNodeProxies; import org.apache.hadoop.hdfs.protocol.AlreadyBeingCreatedException; @@ -53,6 +56,7 @@ public class NameNodeConnector implements Closeable { private static final Log LOG = LogFactory.getLog(NameNodeConnector.class); private static final int MAX_NOT_CHANGED_ITERATIONS = 5; + private static boolean createIdFile = true; /** Create {@link NameNodeConnector} for the given namenodes. */ public static List<NameNodeConnector> newNameNodeConnectors( @@ -83,6 +87,11 @@ public class NameNodeConnector implements Closeable { return connectors; } + @VisibleForTesting + public static void setCreateIdFile(boolean create) { + createIdFile = create; + } + private final URI nameNodeUri; private final String blockpoolID; @@ -117,9 +126,10 @@ public class NameNodeConnector implements Closeable { final FsServerDefaults defaults = fs.getServerDefaults(new Path("/")); this.keyManager = new KeyManager(blockpoolID, namenode, defaults.getEncryptDataTransfer(), conf); - // Exit if there is another one running. - out = checkAndMarkRunning(); - if (out == null) { + // if it is for test, we do not create the id file + out = createIdFile ? checkAndMarkRunning() : null; + if (createIdFile && out == null) { + // Exit if there is another one running. throw new IOException("Another " + name + " is running."); } } @@ -188,9 +198,9 @@ public class NameNodeConnector implements Closeable { */ private OutputStream checkAndMarkRunning() throws IOException { try { - final DataOutputStream out = fs.create(idPath); + final FSDataOutputStream out = fs.create(idPath); out.writeBytes(InetAddress.getLocalHost().getHostName()); - out.flush(); + out.hflush(); return out; } catch(RemoteException e) { if(AlreadyBeingCreatedException.class.getName().equals(e.getClassName())){ http://git-wip-us.apache.org/repos/asf/hadoop/blob/0d85f7e5/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java index 956900d..cb303a7 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java @@ -1502,7 +1502,7 @@ public class BlockManager { * @throws IOException * if the number of targets < minimum replication. * @see BlockPlacementPolicy#chooseTarget(String, int, Node, - * List, boolean, Set, long, StorageType) + * Set, long, List, BlockStoragePolicy) */ public DatanodeStorageInfo[] chooseTarget4NewBlock(final String src, final int numOfReplicas, final DatanodeDescriptor client, @@ -2811,7 +2811,7 @@ public class BlockManager { return false; // only consider delHint for the first case } else if (delHint == null) { return false; // no delHint - } else if (!excessTypes.remove(delHint.getStorageType())) { + } else if (!excessTypes.contains(delHint.getStorageType())) { return false; // delHint storage type is not an excess type } else { // check if removing delHint reduces the number of racks http://git-wip-us.apache.org/repos/asf/hadoop/blob/0d85f7e5/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java index 3810621..9b57262 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java @@ -1744,7 +1744,9 @@ public class DataNode extends Configured + b + " (numBytes=" + b.getNumBytes() + ")" + ", stage=" + stage + ", clientname=" + clientname - + ", targets=" + Arrays.asList(targets)); + + ", targets=" + Arrays.asList(targets) + + ", target storage types=" + (targetStorageTypes == null ? "[]" : + Arrays.asList(targetStorageTypes))); } this.targets = targets; this.targetStorageTypes = targetStorageTypes; http://git-wip-us.apache.org/repos/asf/hadoop/blob/0d85f7e5/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/mover/Mover.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/mover/Mover.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/mover/Mover.java index c5d6dab..96588ff 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/mover/Mover.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/mover/Mover.java @@ -328,8 +328,6 @@ public class Mover { if (scheduleMoves4Block(diff, lb)) { hasRemaining |= (diff.existing.size() > 1 && diff.expected.size() > 1); - } else { - hasRemaining = false; // not able to schedule any move } } } @@ -453,9 +451,11 @@ public class Mover { static int run(Map<URI, List<Path>> namenodes, Configuration conf) throws IOException, InterruptedException { - final long sleeptime = 2000 * conf.getLong( - DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_KEY, - DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_DEFAULT); + final long sleeptime = + conf.getLong(DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_KEY, + DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_DEFAULT) * 2000 + + conf.getLong(DFSConfigKeys.DFS_NAMENODE_REPLICATION_INTERVAL_KEY, + DFSConfigKeys.DFS_NAMENODE_REPLICATION_INTERVAL_DEFAULT) * 1000; LOG.info("namenodes = " + namenodes); List<NameNodeConnector> connectors = Collections.emptyList(); http://git-wip-us.apache.org/repos/asf/hadoop/blob/0d85f7e5/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestBlockStoragePolicy.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestBlockStoragePolicy.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestBlockStoragePolicy.java index da7306c..158c225 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestBlockStoragePolicy.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestBlockStoragePolicy.java @@ -256,7 +256,7 @@ public class TestBlockStoragePolicy { final short replication = 3; { - final List<StorageType> chosen = Arrays.asList(); + final List<StorageType> chosen = Lists.newArrayList(); method.checkChooseStorageTypes(hot, replication, chosen, StorageType.DISK, StorageType.DISK, StorageType.DISK); method.checkChooseStorageTypes(warm, replication, chosen, @@ -393,7 +393,7 @@ public class TestBlockStoragePolicy { final EnumSet<StorageType> unavailables = disk; final boolean isNewBlock = true; { - final List<StorageType> chosen = Arrays.asList(); + final List<StorageType> chosen = Lists.newArrayList(); checkChooseStorageTypes(hot, replication, chosen, unavailables, isNewBlock); checkChooseStorageTypes(warm, replication, chosen, unavailables, isNewBlock, StorageType.ARCHIVE, StorageType.ARCHIVE, StorageType.ARCHIVE); @@ -500,7 +500,7 @@ public class TestBlockStoragePolicy { final short replication = 3; { - final List<StorageType> chosen = Arrays.asList(); + final List<StorageType> chosen = Lists.newArrayList(); method.checkChooseStorageTypes(hot, replication, chosen, StorageType.DISK, StorageType.DISK, StorageType.DISK); method.checkChooseStorageTypes(warm, replication, chosen, @@ -603,7 +603,7 @@ public class TestBlockStoragePolicy { final EnumSet<StorageType> unavailables = disk; final boolean isNewBlock = false; { - final List<StorageType> chosen = Arrays.asList(); + final List<StorageType> chosen = Lists.newArrayList(); checkChooseStorageTypes(hot, replication, chosen, unavailables, isNewBlock, StorageType.ARCHIVE, StorageType.ARCHIVE, StorageType.ARCHIVE); checkChooseStorageTypes(warm, replication, chosen, unavailables, isNewBlock, http://git-wip-us.apache.org/repos/asf/hadoop/blob/0d85f7e5/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/mover/TestStorageMover.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/mover/TestStorageMover.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/mover/TestStorageMover.java index fda744f..d5d5cab 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/mover/TestStorageMover.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/mover/TestStorageMover.java @@ -44,9 +44,13 @@ import org.apache.hadoop.hdfs.protocol.HdfsConstants; import org.apache.hadoop.hdfs.protocol.HdfsFileStatus; import org.apache.hadoop.hdfs.protocol.HdfsLocatedFileStatus; import org.apache.hadoop.hdfs.protocol.LocatedBlock; +import org.apache.hadoop.hdfs.protocol.datatransfer.DataTransferProtocol; import org.apache.hadoop.hdfs.server.balancer.Dispatcher; import org.apache.hadoop.hdfs.server.balancer.ExitStatus; +import org.apache.hadoop.hdfs.server.balancer.NameNodeConnector; import org.apache.hadoop.hdfs.server.blockmanagement.BlockPlacementPolicy; +import org.apache.hadoop.hdfs.server.datanode.DataNode; +import org.apache.hadoop.hdfs.server.datanode.DataNodeTestUtils; import org.apache.hadoop.hdfs.server.namenode.snapshot.SnapshotTestHelper; import org.apache.hadoop.io.IOUtils; import org.apache.log4j.Level; @@ -66,6 +70,8 @@ public class TestStorageMover { ).getLogger().setLevel(Level.ALL); ((Log4JLogger)LogFactory.getLog(Dispatcher.class) ).getLogger().setLevel(Level.ALL); + ((Log4JLogger)LogFactory.getLog(DataTransferProtocol.class)).getLogger() + .setLevel(Level.ALL); } private static final int BLOCK_SIZE = 1024; @@ -80,6 +86,8 @@ public class TestStorageMover { static { DEFAULT_CONF.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, BLOCK_SIZE); DEFAULT_CONF.setLong(DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_KEY, 1L); + DEFAULT_CONF.setLong(DFSConfigKeys.DFS_NAMENODE_REPLICATION_INTERVAL_KEY, + 2L); DEFAULT_CONF.setLong(DFSConfigKeys.DFS_MOVER_MOVEDWINWIDTH_KEY, 2000L); DEFAULT_POLICIES = BlockStoragePolicy.readBlockStorageSuite(DEFAULT_CONF); @@ -87,6 +95,9 @@ public class TestStorageMover { WARM = DEFAULT_POLICIES.getPolicy("WARM"); COLD = DEFAULT_POLICIES.getPolicy("COLD"); Dispatcher.setBlockMoveWaitTime(1000L); + Dispatcher.setDelayAfterErrors(1000L); + // do not create id file since we will eat up all the disk space + NameNodeConnector.setCreateIdFile(false); } /** @@ -151,7 +162,7 @@ public class TestStorageMover { ClusterScheme() { this(DEFAULT_CONF, NUM_DATANODES, REPL, - genStorageTypes(NUM_DATANODES, 1, 1), null); + genStorageTypes(NUM_DATANODES), null); } ClusterScheme(Configuration conf, int numDataNodes, short repl, @@ -195,7 +206,7 @@ public class TestStorageMover { dfs = cluster.getFileSystem(); } - private void runBasicTest(boolean shotdown) throws Exception { + private void runBasicTest(boolean shutdown) throws Exception { setupCluster(); try { prepareNamespace(); @@ -205,7 +216,7 @@ public class TestStorageMover { migrate(); verify(true); } finally { - if (shotdown) { + if (shutdown) { shutdownCluster(); } } @@ -233,7 +244,7 @@ public class TestStorageMover { /** * Run the migration tool. */ - void migrate(String... args) throws Exception { + void migrate() throws Exception { runMover(); Thread.sleep(5000); // let the NN finish deletion } @@ -242,6 +253,9 @@ public class TestStorageMover { * Verify block locations after running the migration tool. */ void verify(boolean verifyAll) throws Exception { + for (DataNode dn : cluster.getDataNodes()) { + DataNodeTestUtils.triggerBlockReport(dn); + } if (verifyAll) { verifyNamespace(); } else { @@ -308,7 +322,8 @@ public class TestStorageMover { final Mover.StorageTypeDiff diff = new Mover.StorageTypeDiff(types, lb.getStorageTypes()); Assert.assertTrue(fileStatus.getFullName(parent.toString()) - + " with policy " + policy + " has non-empty overlap: " + diff, + + " with policy " + policy + " has non-empty overlap: " + diff + + ", the corresponding block is " + lb.getBlock().getLocalBlock(), diff.removeOverlap()); } } @@ -378,6 +393,7 @@ public class TestStorageMover { return "[disk=" + disk + ", archive=" + archive + "]"; } } + private static StorageType[][] genStorageTypes(int numDataNodes) { return genStorageTypes(numDataNodes, 0, 0); } @@ -414,21 +430,6 @@ public class TestStorageMover { return capacities; } - /** - * A normal case for Mover: move a file into archival storage - */ - @Test - public void testMigrateFileToArchival() throws Exception { - final Path foo = new Path("/foo"); - Map<Path, BlockStoragePolicy> policyMap = Maps.newHashMap(); - policyMap.put(foo, COLD); - NamespaceScheme nsScheme = new NamespaceScheme(null, Arrays.asList(foo), - 2*BLOCK_SIZE, null, policyMap); - ClusterScheme clusterScheme = new ClusterScheme(DEFAULT_CONF, - NUM_DATANODES, REPL, genStorageTypes(NUM_DATANODES), null); - new MigrationTest(clusterScheme, nsScheme).runBasicTest(true); - } - private static class PathPolicyMap { final Map<Path, BlockStoragePolicy> map = Maps.newHashMap(); final Path hot = new Path("/hot"); @@ -447,13 +448,13 @@ public class TestStorageMover { } } } - + NamespaceScheme newNamespaceScheme() { return new NamespaceScheme(Arrays.asList(hot, warm, cold), files, BLOCK_SIZE/2, null, map); } - - /** + + /** * Move hot files to warm and cold, warm files to hot and cold, * and cold files to hot and warm. */ @@ -473,21 +474,41 @@ public class TestStorageMover { } /** + * A normal case for Mover: move a file into archival storage + */ + @Test + public void testMigrateFileToArchival() throws Exception { + LOG.info("testMigrateFileToArchival"); + final Path foo = new Path("/foo"); + Map<Path, BlockStoragePolicy> policyMap = Maps.newHashMap(); + policyMap.put(foo, COLD); + NamespaceScheme nsScheme = new NamespaceScheme(null, Arrays.asList(foo), + 2*BLOCK_SIZE, null, policyMap); + ClusterScheme clusterScheme = new ClusterScheme(DEFAULT_CONF, + NUM_DATANODES, REPL, genStorageTypes(NUM_DATANODES), null); + new MigrationTest(clusterScheme, nsScheme).runBasicTest(true); + } + + /** * Test directories with Hot, Warm and Cold polices. */ @Test public void testHotWarmColdDirs() throws Exception { + LOG.info("testHotWarmColdDirs"); PathPolicyMap pathPolicyMap = new PathPolicyMap(3); NamespaceScheme nsScheme = pathPolicyMap.newNamespaceScheme(); ClusterScheme clusterScheme = new ClusterScheme(); MigrationTest test = new MigrationTest(clusterScheme, nsScheme); - test.runBasicTest(false); + try { + test.runBasicTest(false); + pathPolicyMap.moveAround(test.dfs); + test.migrate(); - pathPolicyMap.moveAround(test.dfs); - test.migrate(); - test.verify(true); - test.shutdownCluster(); + test.verify(true); + } finally { + test.shutdownCluster(); + } } /** @@ -495,76 +516,81 @@ public class TestStorageMover { */ @Test public void testNoSpaceDisk() throws Exception { + LOG.info("testNoSpaceDisk"); final PathPolicyMap pathPolicyMap = new PathPolicyMap(0); final NamespaceScheme nsScheme = pathPolicyMap.newNamespaceScheme(); - final long diskCapacity = (10 + HdfsConstants.MIN_BLOCKS_FOR_WRITE)*BLOCK_SIZE; - final long archiveCapacity = 100*BLOCK_SIZE; + final long diskCapacity = (6 + HdfsConstants.MIN_BLOCKS_FOR_WRITE) + * BLOCK_SIZE; + final long archiveCapacity = 100 * BLOCK_SIZE; final long[][] capacities = genCapacities(NUM_DATANODES, 1, 1, diskCapacity, archiveCapacity); - final ClusterScheme clusterScheme = new ClusterScheme(DEFAULT_CONF, + Configuration conf = new Configuration(DEFAULT_CONF); + final ClusterScheme clusterScheme = new ClusterScheme(conf, NUM_DATANODES, REPL, genStorageTypes(NUM_DATANODES, 1, 1), capacities); final MigrationTest test = new MigrationTest(clusterScheme, nsScheme); - test.runBasicTest(false); - - // create hot files with replication 3 until not more spaces. - final short replication = 3; - { - int hotFileCount = 0; - try { - for(; ; hotFileCount++) { - final Path p = new Path(pathPolicyMap.hot, "file" + hotFileCount); - DFSTestUtil.createFile(test.dfs, p, BLOCK_SIZE, replication, 0L); + try { + test.runBasicTest(false); + + // create hot files with replication 3 until not more spaces. + final short replication = 3; + { + int hotFileCount = 0; + try { + for (; ; hotFileCount++) { + final Path p = new Path(pathPolicyMap.hot, "file" + hotFileCount); + DFSTestUtil.createFile(test.dfs, p, BLOCK_SIZE, replication, 0L); + } + } catch (IOException e) { + LOG.info("Expected: hotFileCount=" + hotFileCount, e); } - } catch(IOException e) { - LOG.info("Expected: hotFileCount=" + hotFileCount, e); + Assert.assertTrue(hotFileCount >= 1); } - Assert.assertTrue(hotFileCount >= 2); - } - // create hot files with replication 1 to use up all remaining spaces. - { - int hotFileCount_r1 = 0; - try { - for(; ; hotFileCount_r1++) { - final Path p = new Path(pathPolicyMap.hot, "file_r1_" + hotFileCount_r1); - DFSTestUtil.createFile(test.dfs, p, BLOCK_SIZE, (short)1, 0L); + // create hot files with replication 1 to use up all remaining spaces. + { + int hotFileCount_r1 = 0; + try { + for (; ; hotFileCount_r1++) { + final Path p = new Path(pathPolicyMap.hot, "file_r1_" + hotFileCount_r1); + DFSTestUtil.createFile(test.dfs, p, BLOCK_SIZE, (short) 1, 0L); + } + } catch (IOException e) { + LOG.info("Expected: hotFileCount_r1=" + hotFileCount_r1, e); } - } catch(IOException e) { - LOG.info("Expected: hotFileCount_r1=" + hotFileCount_r1, e); } - } - { // test increasing replication. Since DISK is full, - // new replicas should be stored in ARCHIVE as a fallback storage. - final Path file0 = new Path(pathPolicyMap.hot, "file0"); - final Replication r = test.getReplication(file0); - final short newReplication = (short)5; - test.dfs.setReplication(file0, newReplication); - Thread.sleep(10000); - test.verifyReplication(file0, r.disk, newReplication - r.disk); - } + { // test increasing replication. Since DISK is full, + // new replicas should be stored in ARCHIVE as a fallback storage. + final Path file0 = new Path(pathPolicyMap.hot, "file0"); + final Replication r = test.getReplication(file0); + final short newReplication = (short) 5; + test.dfs.setReplication(file0, newReplication); + Thread.sleep(10000); + test.verifyReplication(file0, r.disk, newReplication - r.disk); + } - { // test creating a cold file and then increase replication - final Path p = new Path(pathPolicyMap.cold, "foo"); - DFSTestUtil.createFile(test.dfs, p, BLOCK_SIZE, replication, 0L); - test.verifyReplication(p, 0, replication); + { // test creating a cold file and then increase replication + final Path p = new Path(pathPolicyMap.cold, "foo"); + DFSTestUtil.createFile(test.dfs, p, BLOCK_SIZE, replication, 0L); + test.verifyReplication(p, 0, replication); - final short newReplication = 5; - test.dfs.setReplication(p, newReplication); - Thread.sleep(10000); - test.verifyReplication(p, 0, newReplication); - } + final short newReplication = 5; + test.dfs.setReplication(p, newReplication); + Thread.sleep(10000); + test.verifyReplication(p, 0, newReplication); + } - { //test move a hot file to warm - final Path file1 = new Path(pathPolicyMap.hot, "file1"); - test.dfs.rename(file1, pathPolicyMap.warm); - test.migrate(); - test.verifyFile(new Path(pathPolicyMap.warm, "file1"), WARM.getId());; + { //test move a hot file to warm + final Path file1 = new Path(pathPolicyMap.hot, "file1"); + test.dfs.rename(file1, pathPolicyMap.warm); + test.migrate(); + test.verifyFile(new Path(pathPolicyMap.warm, "file1"), WARM.getId()); + } + } finally { + test.shutdownCluster(); } - - test.shutdownCluster(); } /** @@ -572,73 +598,77 @@ public class TestStorageMover { */ @Test public void testNoSpaceArchive() throws Exception { + LOG.info("testNoSpaceArchive"); final PathPolicyMap pathPolicyMap = new PathPolicyMap(0); final NamespaceScheme nsScheme = pathPolicyMap.newNamespaceScheme(); - final long diskCapacity = 100*BLOCK_SIZE; - final long archiveCapacity = (10 + HdfsConstants.MIN_BLOCKS_FOR_WRITE)*BLOCK_SIZE; + final long diskCapacity = 100 * BLOCK_SIZE; + final long archiveCapacity = (6 + HdfsConstants.MIN_BLOCKS_FOR_WRITE) + * BLOCK_SIZE; final long[][] capacities = genCapacities(NUM_DATANODES, 1, 1, diskCapacity, archiveCapacity); final ClusterScheme clusterScheme = new ClusterScheme(DEFAULT_CONF, NUM_DATANODES, REPL, genStorageTypes(NUM_DATANODES, 1, 1), capacities); final MigrationTest test = new MigrationTest(clusterScheme, nsScheme); - test.runBasicTest(false); - - // create cold files with replication 3 until not more spaces. - final short replication = 3; - { - int coldFileCount = 0; - try { - for(; ; coldFileCount++) { - final Path p = new Path(pathPolicyMap.cold, "file" + coldFileCount); - DFSTestUtil.createFile(test.dfs, p, BLOCK_SIZE, replication, 0L); + try { + test.runBasicTest(false); + + // create cold files with replication 3 until not more spaces. + final short replication = 3; + { + int coldFileCount = 0; + try { + for (; ; coldFileCount++) { + final Path p = new Path(pathPolicyMap.cold, "file" + coldFileCount); + DFSTestUtil.createFile(test.dfs, p, BLOCK_SIZE, replication, 0L); + } + } catch (IOException e) { + LOG.info("Expected: coldFileCount=" + coldFileCount, e); } - } catch(IOException e) { - LOG.info("Expected: coldFileCount=" + coldFileCount, e); + Assert.assertTrue(coldFileCount >= 1); } - Assert.assertTrue(coldFileCount >= 2); - } - // create cold files with replication 1 to use up all remaining spaces. - { - int coldFileCount_r1 = 0; - try { - for(; ; coldFileCount_r1++) { - final Path p = new Path(pathPolicyMap.cold, "file_r1_" + coldFileCount_r1); - DFSTestUtil.createFile(test.dfs, p, BLOCK_SIZE, (short)1, 0L); + // create cold files with replication 1 to use up all remaining spaces. + { + int coldFileCount_r1 = 0; + try { + for (; ; coldFileCount_r1++) { + final Path p = new Path(pathPolicyMap.cold, "file_r1_" + coldFileCount_r1); + DFSTestUtil.createFile(test.dfs, p, BLOCK_SIZE, (short) 1, 0L); + } + } catch (IOException e) { + LOG.info("Expected: coldFileCount_r1=" + coldFileCount_r1, e); } - } catch(IOException e) { - LOG.info("Expected: coldFileCount_r1=" + coldFileCount_r1, e); } - } - { // test increasing replication but new replicas cannot be created - // since no more ARCHIVE space. - final Path file0 = new Path(pathPolicyMap.cold, "file0"); - final Replication r = test.getReplication(file0); - LOG.info("XXX " + file0 + ": replication=" + r); - Assert.assertEquals(0, r.disk); + { // test increasing replication but new replicas cannot be created + // since no more ARCHIVE space. + final Path file0 = new Path(pathPolicyMap.cold, "file0"); + final Replication r = test.getReplication(file0); + LOG.info("XXX " + file0 + ": replication=" + r); + Assert.assertEquals(0, r.disk); - final short newReplication = (short)5; - test.dfs.setReplication(file0, newReplication); - Thread.sleep(10000); + final short newReplication = (short) 5; + test.dfs.setReplication(file0, newReplication); + Thread.sleep(10000); - test.verifyReplication(file0, 0, r.archive); - } + test.verifyReplication(file0, 0, r.archive); + } - { // test creating a hot file - final Path p = new Path(pathPolicyMap.hot, "foo"); - DFSTestUtil.createFile(test.dfs, p, BLOCK_SIZE, (short)3, 0L); - } + { // test creating a hot file + final Path p = new Path(pathPolicyMap.hot, "foo"); + DFSTestUtil.createFile(test.dfs, p, BLOCK_SIZE, (short) 3, 0L); + } - { //test move a cold file to warm - final Path file1 = new Path(pathPolicyMap.hot, "file1"); - test.dfs.rename(file1, pathPolicyMap.warm); - test.migrate(); - test.verify(true); + { //test move a cold file to warm + final Path file1 = new Path(pathPolicyMap.cold, "file1"); + test.dfs.rename(file1, pathPolicyMap.warm); + test.migrate(); + test.verify(true); + } + } finally { + test.shutdownCluster(); } - - test.shutdownCluster(); } }