http://git-wip-us.apache.org/repos/asf/hadoop/blob/a67fbac3/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestStoragePolicySatisfier.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestStoragePolicySatisfier.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestStoragePolicySatisfier.java index 57e9f94..70219f6 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestStoragePolicySatisfier.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestStoragePolicySatisfier.java @@ -203,11 +203,11 @@ public class TestStoragePolicySatisfier { } /** - * Tests to verify that the block storage movement results will be propagated + * Tests to verify that the block storage movement report will be propagated * to Namenode via datanode heartbeat. */ @Test(timeout = 300000) - public void testPerTrackIdBlocksStorageMovementResults() throws Exception { + public void testBlksStorageMovementAttemptFinishedReport() throws Exception { try { createCluster(); // Change policy to ONE_SSD @@ -229,7 +229,7 @@ public class TestStoragePolicySatisfier { DFSTestUtil.waitExpectedStorageType( file, StorageType.DISK, 2, 30000, dfs); - waitForBlocksMovementResult(1, 30000); + waitForBlocksMovementAttemptReport(1, 30000); } finally { shutdownCluster(); } @@ -276,7 +276,7 @@ public class TestStoragePolicySatisfier { fileName, StorageType.DISK, 2, 30000, dfs); } - waitForBlocksMovementResult(files.size(), 30000); + waitForBlocksMovementAttemptReport(files.size(), 30000); } finally { shutdownCluster(); } @@ -457,7 +457,7 @@ public class TestStoragePolicySatisfier { DFSTestUtil.waitExpectedStorageType( file, StorageType.DISK, 2, 30000, dfs); - waitForBlocksMovementResult(1, 30000); + waitForBlocksMovementAttemptReport(1, 30000); } finally { shutdownCluster(); } @@ -630,7 +630,7 @@ public class TestStoragePolicySatisfier { // No block movement will be scheduled as there is no target node // available with the required storage type. waitForAttemptedItems(1, 30000); - waitForBlocksMovementResult(1, 30000); + waitForBlocksMovementAttemptReport(1, 30000); DFSTestUtil.waitExpectedStorageType( file1, StorageType.ARCHIVE, 1, 30000, dfs); DFSTestUtil.waitExpectedStorageType( @@ -691,7 +691,7 @@ public class TestStoragePolicySatisfier { DFSTestUtil.waitExpectedStorageType( file, StorageType.DISK, 3, 30000, dfs); - waitForBlocksMovementResult(1, 30000); + waitForBlocksMovementAttemptReport(1, 30000); } finally { shutdownCluster(); } @@ -871,7 +871,7 @@ public class TestStoragePolicySatisfier { Set<DatanodeDescriptor> dns = hdfsCluster.getNamesystem() .getBlockManager().getDatanodeManager().getDatanodes(); for (DatanodeDescriptor dd : dns) { - assertNull(dd.getBlocksToMoveStorages()); + assertNull(dd.getBlocksToMoveStorages(1)); } // Enable heart beats now @@ -1224,7 +1224,7 @@ public class TestStoragePolicySatisfier { /** * Test SPS for batch processing. */ - @Test(timeout = 300000) + @Test(timeout = 3000000) public void testBatchProcessingForSPSDirectory() throws Exception { try { StorageType[][] diskTypes = new StorageType[][] { @@ -1252,7 +1252,7 @@ public class TestStoragePolicySatisfier { DFSTestUtil.waitExpectedStorageType(fileName, StorageType.ARCHIVE, 2, 30000, dfs); } - waitForBlocksMovementResult(files.size(), 30000); + waitForBlocksMovementAttemptReport(files.size(), 30000); String expectedLogMessage = "StorageMovementNeeded queue remaining" + " capacity is zero"; assertTrue("Log output does not contain expected log message: " @@ -1268,7 +1268,7 @@ public class TestStoragePolicySatisfier { * 1. Delete /root when traversing Q * 2. U, R, S should not be in queued. */ - @Test + @Test(timeout = 300000) public void testTraverseWhenParentDeleted() throws Exception { StorageType[][] diskTypes = new StorageType[][] { {StorageType.DISK, StorageType.ARCHIVE}, @@ -1330,7 +1330,7 @@ public class TestStoragePolicySatisfier { * 1. Delete L when traversing Q * 2. E, M, U, R, S should not be in queued. */ - @Test + @Test(timeout = 300000) public void testTraverseWhenRootParentDeleted() throws Exception { StorageType[][] diskTypes = new StorageType[][] { {StorageType.DISK, StorageType.ARCHIVE}, @@ -1387,6 +1387,82 @@ public class TestStoragePolicySatisfier { dfs.delete(new Path("/root"), true); } + /** + * Test storage move blocks while under replication block tasks exists in the + * system. So, both will share the max transfer streams. + * + * 1. Create cluster with 3 datanode. + * 2. Create 20 files with 2 replica. + * 3. Start 2 more DNs with DISK & SSD types + * 4. SetReplication factor for the 1st 10 files to 4 to trigger replica task + * 5. Set policy to SSD to the 2nd set of files from 11-20 + * 6. Call SPS for 11-20 files to trigger move block tasks to new DNs + * 7. Wait for the under replica and SPS tasks completion + */ + @Test(timeout = 300000) + public void testMoveBlocksWithUnderReplicatedBlocks() throws Exception { + try { + config.setInt(DFSConfigKeys.DFS_NAMENODE_REPLICATION_MAX_STREAMS_KEY, 3); + config.setLong("dfs.block.size", DEFAULT_BLOCK_SIZE); + config.setBoolean(DFSConfigKeys.DFS_STORAGE_POLICY_SATISFIER_ENABLED_KEY, + true); + config.set(DFSConfigKeys + .DFS_STORAGE_POLICY_SATISFIER_RECHECK_TIMEOUT_MILLIS_KEY, + "3000"); + config.setBoolean(DFSConfigKeys + .DFS_STORAGE_POLICY_SATISFIER_SHARE_EQUAL_REPLICA_MAX_STREAMS_KEY, + true); + + StorageType[][] storagetypes = new StorageType[][] { + {StorageType.ARCHIVE, StorageType.DISK}, + {StorageType.ARCHIVE, StorageType.DISK}}; + hdfsCluster = new MiniDFSCluster.Builder(config).numDataNodes(2) + .storageTypes(storagetypes).build(); + hdfsCluster.waitActive(); + dfs = hdfsCluster.getFileSystem(); + + // Below files will be used for pending replication block tasks. + for (int i=1; i<=20; i++){ + Path filePath = new Path("/file" + i); + DFSTestUtil.createFile(dfs, filePath, DEFAULT_BLOCK_SIZE * 5, (short) 2, + 0); + } + + StorageType[][] newtypes = + new StorageType[][]{{StorageType.DISK, StorageType.SSD}, + {StorageType.DISK, StorageType.SSD}}; + startAdditionalDNs(config, 2, numOfDatanodes, newtypes, + storagesPerDatanode, capacity, hdfsCluster); + + // increase replication factor to 4 for the first 10 files and thus + // initiate replica tasks + for (int i=1; i<=10; i++){ + Path filePath = new Path("/file" + i); + dfs.setReplication(filePath, (short) 4); + } + + // invoke SPS for 11-20 files + for (int i = 11; i <= 20; i++) { + Path filePath = new Path("/file" + i); + dfs.setStoragePolicy(filePath, "ALL_SSD"); + dfs.satisfyStoragePolicy(filePath); + } + + for (int i = 1; i <= 10; i++) { + Path filePath = new Path("/file" + i); + DFSTestUtil.waitExpectedStorageType(filePath.toString(), + StorageType.DISK, 4, 30000, hdfsCluster.getFileSystem()); + } + for (int i = 11; i <= 20; i++) { + Path filePath = new Path("/file" + i); + DFSTestUtil.waitExpectedStorageType(filePath.toString(), + StorageType.SSD, 2, 30000, hdfsCluster.getFileSystem()); + } + } finally { + shutdownCluster(); + } + } + private static void createDirectoryTree(DistributedFileSystem dfs) throws Exception { // tree structure @@ -1514,18 +1590,19 @@ public class TestStoragePolicySatisfier { }, 100, timeout); } - private void waitForBlocksMovementResult(long expectedBlkMovResultsCount, - int timeout) throws TimeoutException, InterruptedException { + private void waitForBlocksMovementAttemptReport( + long expectedMovementFinishedBlocksCount, int timeout) + throws TimeoutException, InterruptedException { BlockManager blockManager = hdfsCluster.getNamesystem().getBlockManager(); final StoragePolicySatisfier sps = blockManager.getStoragePolicySatisfier(); GenericTestUtils.waitFor(new Supplier<Boolean>() { @Override public Boolean get() { - LOG.info("expectedResultsCount={} actualResultsCount={}", - expectedBlkMovResultsCount, - sps.getAttemptedItemsMonitor().resultsCount()); - return sps.getAttemptedItemsMonitor() - .resultsCount() == expectedBlkMovResultsCount; + LOG.info("MovementFinishedBlocks: expectedCount={} actualCount={}", + expectedMovementFinishedBlocksCount, + sps.getAttemptedItemsMonitor().getMovementFinishedBlocksCount()); + return sps.getAttemptedItemsMonitor().getMovementFinishedBlocksCount() + >= expectedMovementFinishedBlocksCount; } }, 100, timeout); }
http://git-wip-us.apache.org/repos/asf/hadoop/blob/a67fbac3/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestStoragePolicySatisfierWithStripedFile.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestStoragePolicySatisfierWithStripedFile.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestStoragePolicySatisfierWithStripedFile.java index fc5d0a5..154ddae 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestStoragePolicySatisfierWithStripedFile.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestStoragePolicySatisfierWithStripedFile.java @@ -180,7 +180,7 @@ public class TestStoragePolicySatisfierWithStripedFile { LOG.info("Sets storage policy to COLD and invoked satisfyStoragePolicy"); cluster.triggerHeartbeats(); - waitForBlocksMovementResult(cluster, 1, 60000); + waitForBlocksMovementAttemptReport(cluster, 9, 60000); // verify storage types and locations waitExpectedStorageType(cluster, fooFile, fileLen, StorageType.ARCHIVE, 9, 9, 60000); @@ -290,7 +290,7 @@ public class TestStoragePolicySatisfierWithStripedFile { LOG.info("Sets storage policy to COLD and invoked satisfyStoragePolicy"); cluster.triggerHeartbeats(); - waitForBlocksMovementResult(cluster, 1, 60000); + waitForBlocksMovementAttemptReport(cluster, 5, 60000); waitForAttemptedItems(cluster, 1, 30000); // verify storage types and locations. waitExpectedStorageType(cluster, fooFile, fileLen, StorageType.ARCHIVE, 5, @@ -556,10 +556,10 @@ public class TestStoragePolicySatisfierWithStripedFile { }, 100, timeout); } - // Check whether the block movement result has been arrived at the + // Check whether the block movement attempt report has been arrived at the // Namenode(SPS). - private void waitForBlocksMovementResult(MiniDFSCluster cluster, - long expectedBlkMovResultsCount, int timeout) + private void waitForBlocksMovementAttemptReport(MiniDFSCluster cluster, + long expectedMovementFinishedBlocksCount, int timeout) throws TimeoutException, InterruptedException { BlockManager blockManager = cluster.getNamesystem().getBlockManager(); final StoragePolicySatisfier sps = blockManager.getStoragePolicySatisfier(); @@ -568,11 +568,11 @@ public class TestStoragePolicySatisfierWithStripedFile { GenericTestUtils.waitFor(new Supplier<Boolean>() { @Override public Boolean get() { - LOG.info("expectedResultsCount={} actualResultsCount={}", - expectedBlkMovResultsCount, - sps.getAttemptedItemsMonitor().resultsCount()); - return sps.getAttemptedItemsMonitor() - .resultsCount() == expectedBlkMovResultsCount; + LOG.info("MovementFinishedBlocks: expectedCount={} actualCount={}", + expectedMovementFinishedBlocksCount, + sps.getAttemptedItemsMonitor().getMovementFinishedBlocksCount()); + return sps.getAttemptedItemsMonitor().getMovementFinishedBlocksCount() + >= expectedMovementFinishedBlocksCount; } }, 100, timeout); } --------------------------------------------------------------------- To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org For additional commands, e-mail: common-commits-h...@hadoop.apache.org