http://git-wip-us.apache.org/repos/asf/hadoop/blob/c2e7fbd4/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/sps/TestStoragePolicySatisfier.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/sps/TestStoragePolicySatisfier.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/sps/TestStoragePolicySatisfier.java index 2a7bde5..9354044 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/sps/TestStoragePolicySatisfier.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/sps/TestStoragePolicySatisfier.java @@ -72,7 +72,6 @@ import org.apache.hadoop.test.GenericTestUtils.LogCapturer; import org.junit.After; import org.junit.Assert; import org.junit.Test; -import org.mockito.Mockito; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.slf4j.event.Level; @@ -147,12 +146,11 @@ public class TestStoragePolicySatisfier { startAdditionalDNs(config, 3, numOfDatanodes, newtypes, storagesPerDatanode, capacity, hdfsCluster); - dfs.satisfyStoragePolicy(new Path(file)); - hdfsCluster.triggerHeartbeats(); + dfs.satisfyStoragePolicy(new Path(file)); // Wait till namenode notified about the block location details - DFSTestUtil.waitExpectedStorageType( - file, StorageType.ARCHIVE, 3, 30000, dfs); + DFSTestUtil.waitExpectedStorageType(file, StorageType.ARCHIVE, 3, 35000, + dfs); } @Test(timeout = 300000) @@ -1284,6 +1282,7 @@ public class TestStoragePolicySatisfier { {StorageType.ARCHIVE, StorageType.SSD}, {StorageType.DISK, StorageType.DISK}}; config.setLong("dfs.block.size", DEFAULT_BLOCK_SIZE); + config.setInt(DFS_STORAGE_POLICY_SATISFIER_QUEUE_LIMIT_KEY, 10); hdfsCluster = startCluster(config, diskTypes, diskTypes.length, storagesPerDatanode, capacity); dfs = hdfsCluster.getFileSystem(); @@ -1299,19 +1298,28 @@ public class TestStoragePolicySatisfier { //Queue limit can control the traverse logic to wait for some free //entry in queue. After 10 files, traverse control will be on U. - StoragePolicySatisfier sps = Mockito.mock(StoragePolicySatisfier.class); - Mockito.when(sps.isRunning()).thenReturn(true); - Context ctxt = Mockito.mock(Context.class); - config.setInt(DFS_STORAGE_POLICY_SATISFIER_QUEUE_LIMIT_KEY, 10); - Mockito.when(ctxt.getConf()).thenReturn(config); - Mockito.when(ctxt.isRunning()).thenReturn(true); - Mockito.when(ctxt.isInSafeMode()).thenReturn(false); - Mockito.when(ctxt.isFileExist(Mockito.anyLong())).thenReturn(true); - BlockStorageMovementNeeded movmentNeededQueue = - new BlockStorageMovementNeeded(ctxt); + StoragePolicySatisfier sps = new StoragePolicySatisfier(config); + Context ctxt = new IntraSPSNameNodeContext(hdfsCluster.getNamesystem(), + hdfsCluster.getNamesystem().getBlockManager(), sps) { + @Override + public boolean isInSafeMode() { + return false; + } + + @Override + public boolean isRunning() { + return true; + } + }; + + FileIdCollector fileIDCollector = + new IntraSPSNameNodeFileIdCollector(fsDir, sps); + sps.init(ctxt, fileIDCollector, null); + sps.getStorageMovementQueue().activate(); + INode rootINode = fsDir.getINode("/root"); - movmentNeededQueue.addToPendingDirQueue(rootINode.getId()); - movmentNeededQueue.init(fsDir); + hdfsCluster.getNamesystem().getBlockManager() + .addSPSPathId(rootINode.getId()); //Wait for thread to reach U. Thread.sleep(1000); @@ -1321,7 +1329,7 @@ public class TestStoragePolicySatisfier { // Remove 10 element and make queue free, So other traversing will start. for (int i = 0; i < 10; i++) { String path = expectedTraverseOrder.remove(0); - long trackId = movmentNeededQueue.get().getTrackId(); + long trackId = sps.getStorageMovementQueue().get().getFileId(); INode inode = fsDir.getInode(trackId); assertTrue("Failed to traverse tree, expected " + path + " but got " + inode.getFullPathName(), path.equals(inode.getFullPathName())); @@ -1332,7 +1340,7 @@ public class TestStoragePolicySatisfier { // Check other element traversed in order and R,S should not be added in // queue which we already removed from expected list for (String path : expectedTraverseOrder) { - long trackId = movmentNeededQueue.get().getTrackId(); + long trackId = sps.getStorageMovementQueue().get().getFileId(); INode inode = fsDir.getInode(trackId); assertTrue("Failed to traverse tree, expected " + path + " but got " + inode.getFullPathName(), path.equals(inode.getFullPathName())); @@ -1352,6 +1360,7 @@ public class TestStoragePolicySatisfier { {StorageType.ARCHIVE, StorageType.SSD}, {StorageType.DISK, StorageType.DISK}}; config.setLong("dfs.block.size", DEFAULT_BLOCK_SIZE); + config.setInt(DFS_STORAGE_POLICY_SATISFIER_QUEUE_LIMIT_KEY, 10); hdfsCluster = startCluster(config, diskTypes, diskTypes.length, storagesPerDatanode, capacity); dfs = hdfsCluster.getFileSystem(); @@ -1366,21 +1375,33 @@ public class TestStoragePolicySatisfier { expectedTraverseOrder.remove("/root/D/M"); expectedTraverseOrder.remove("/root/E"); FSDirectory fsDir = hdfsCluster.getNamesystem().getFSDirectory(); - StoragePolicySatisfier sps = Mockito.mock(StoragePolicySatisfier.class); - Mockito.when(sps.isRunning()).thenReturn(true); + // Queue limit can control the traverse logic to wait for some free // entry in queue. After 10 files, traverse control will be on U. - Context ctxt = Mockito.mock(Context.class); - config.setInt(DFS_STORAGE_POLICY_SATISFIER_QUEUE_LIMIT_KEY, 10); - Mockito.when(ctxt.getConf()).thenReturn(config); - Mockito.when(ctxt.isRunning()).thenReturn(true); - Mockito.when(ctxt.isInSafeMode()).thenReturn(false); - Mockito.when(ctxt.isFileExist(Mockito.anyLong())).thenReturn(true); - BlockStorageMovementNeeded movmentNeededQueue = - new BlockStorageMovementNeeded(ctxt); - movmentNeededQueue.init(fsDir); + // StoragePolicySatisfier sps = new StoragePolicySatisfier(config); + StoragePolicySatisfier sps = new StoragePolicySatisfier(config); + Context ctxt = new IntraSPSNameNodeContext(hdfsCluster.getNamesystem(), + hdfsCluster.getNamesystem().getBlockManager(), sps) { + @Override + public boolean isInSafeMode() { + return false; + } + + @Override + public boolean isRunning() { + return true; + } + }; + + FileIdCollector fileIDCollector = + new IntraSPSNameNodeFileIdCollector(fsDir, sps); + sps.init(ctxt, fileIDCollector, null); + sps.getStorageMovementQueue().activate(); + INode rootINode = fsDir.getINode("/root"); - movmentNeededQueue.addToPendingDirQueue(rootINode.getId()); + hdfsCluster.getNamesystem().getBlockManager() + .addSPSPathId(rootINode.getId()); + // Wait for thread to reach U. Thread.sleep(1000); @@ -1389,7 +1410,7 @@ public class TestStoragePolicySatisfier { // Remove 10 element and make queue free, So other traversing will start. for (int i = 0; i < 10; i++) { String path = expectedTraverseOrder.remove(0); - long trackId = movmentNeededQueue.get().getTrackId(); + long trackId = sps.getStorageMovementQueue().get().getFileId(); INode inode = fsDir.getInode(trackId); assertTrue("Failed to traverse tree, expected " + path + " but got " + inode.getFullPathName(), path.equals(inode.getFullPathName())); @@ -1400,7 +1421,7 @@ public class TestStoragePolicySatisfier { // Check other element traversed in order and E, M, U, R, S should not be // added in queue which we already removed from expected list for (String path : expectedTraverseOrder) { - long trackId = movmentNeededQueue.get().getTrackId(); + long trackId = sps.getStorageMovementQueue().get().getFileId(); INode inode = fsDir.getInode(trackId); assertTrue("Failed to traverse tree, expected " + path + " but got " + inode.getFullPathName(), path.equals(inode.getFullPathName())); @@ -1502,17 +1523,20 @@ public class TestStoragePolicySatisfier { hdfsCluster = new MiniDFSCluster.Builder(config).numDataNodes(2) .storageTypes(storagetypes).build(); hdfsCluster.waitActive(); - BlockStorageMovementNeeded.setStatusClearanceElapsedTimeMs(20000); + // BlockStorageMovementNeeded.setStatusClearanceElapsedTimeMs(200000); dfs = hdfsCluster.getFileSystem(); Path filePath = new Path("/file"); DFSTestUtil.createFile(dfs, filePath, 1024, (short) 2, 0); dfs.setStoragePolicy(filePath, "COLD"); dfs.satisfyStoragePolicy(filePath); + Thread.sleep(3000); StoragePolicySatisfyPathStatus status = dfs.getClient() .checkStoragePolicySatisfyPathStatus(filePath.toString()); - Assert.assertTrue("Status should be IN_PROGRESS", - StoragePolicySatisfyPathStatus.IN_PROGRESS.equals(status)); + Assert.assertTrue( + "Status should be IN_PROGRESS/SUCCESS, but status is " + status, + StoragePolicySatisfyPathStatus.IN_PROGRESS.equals(status) + || StoragePolicySatisfyPathStatus.SUCCESS.equals(status)); DFSTestUtil.waitExpectedStorageType(filePath.toString(), StorageType.ARCHIVE, 2, 30000, dfs); @@ -1530,7 +1554,7 @@ public class TestStoragePolicySatisfier { return false; } }, 100, 60000); - + BlockStorageMovementNeeded.setStatusClearanceElapsedTimeMs(1000); // wait till status is NOT_AVAILABLE GenericTestUtils.waitFor(new Supplier<Boolean>() { @Override @@ -1719,8 +1743,10 @@ public class TestStoragePolicySatisfier { public Boolean get() { LOG.info("expectedAttemptedItemsCount={} actualAttemptedItemsCount={}", expectedBlkMovAttemptedCount, - sps.getAttemptedItemsMonitor().getAttemptedItemsCount()); - return sps.getAttemptedItemsMonitor() + ((BlockStorageMovementAttemptedItems) (sps + .getAttemptedItemsMonitor())).getAttemptedItemsCount()); + return ((BlockStorageMovementAttemptedItems) (sps + .getAttemptedItemsMonitor())) .getAttemptedItemsCount() == expectedBlkMovAttemptedCount; } }, 100, timeout); @@ -1736,8 +1762,11 @@ public class TestStoragePolicySatisfier { public Boolean get() { LOG.info("MovementFinishedBlocks: expectedCount={} actualCount={}", expectedMovementFinishedBlocksCount, - sps.getAttemptedItemsMonitor().getMovementFinishedBlocksCount()); - return sps.getAttemptedItemsMonitor().getMovementFinishedBlocksCount() + ((BlockStorageMovementAttemptedItems) (sps + .getAttemptedItemsMonitor())).getMovementFinishedBlocksCount()); + return ((BlockStorageMovementAttemptedItems) (sps + .getAttemptedItemsMonitor())) + .getMovementFinishedBlocksCount() >= expectedMovementFinishedBlocksCount; } }, 100, timeout);
http://git-wip-us.apache.org/repos/asf/hadoop/blob/c2e7fbd4/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/sps/TestStoragePolicySatisfierWithStripedFile.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/sps/TestStoragePolicySatisfierWithStripedFile.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/sps/TestStoragePolicySatisfierWithStripedFile.java index c1a2b8b..0e3a5a3 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/sps/TestStoragePolicySatisfierWithStripedFile.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/sps/TestStoragePolicySatisfierWithStripedFile.java @@ -500,9 +500,11 @@ public class TestStoragePolicySatisfierWithStripedFile { public Boolean get() { LOG.info("expectedAttemptedItemsCount={} actualAttemptedItemsCount={}", expectedBlkMovAttemptedCount, - sps.getAttemptedItemsMonitor().getAttemptedItemsCount()); - return sps.getAttemptedItemsMonitor() - .getAttemptedItemsCount() == expectedBlkMovAttemptedCount; + ((BlockStorageMovementAttemptedItems) sps + .getAttemptedItemsMonitor()).getAttemptedItemsCount()); + return ((BlockStorageMovementAttemptedItems) sps + .getAttemptedItemsMonitor()) + .getAttemptedItemsCount() == expectedBlkMovAttemptedCount; } }, 100, timeout); } @@ -560,7 +562,7 @@ public class TestStoragePolicySatisfierWithStripedFile { // Check whether the block movement attempt report has been arrived at the // Namenode(SPS). private void waitForBlocksMovementAttemptReport(MiniDFSCluster cluster, - long expectedMovementFinishedBlocksCount, int timeout) + long expectedMoveFinishedBlks, int timeout) throws TimeoutException, InterruptedException { BlockManager blockManager = cluster.getNamesystem().getBlockManager(); final StoragePolicySatisfier sps = blockManager.getStoragePolicySatisfier(); @@ -570,10 +572,11 @@ public class TestStoragePolicySatisfierWithStripedFile { @Override public Boolean get() { LOG.info("MovementFinishedBlocks: expectedCount={} actualCount={}", - expectedMovementFinishedBlocksCount, - sps.getAttemptedItemsMonitor().getMovementFinishedBlocksCount()); - return sps.getAttemptedItemsMonitor().getMovementFinishedBlocksCount() - >= expectedMovementFinishedBlocksCount; + expectedMoveFinishedBlks, ((BlockStorageMovementAttemptedItems) sps + .getAttemptedItemsMonitor()).getMovementFinishedBlocksCount()); + return ((BlockStorageMovementAttemptedItems) sps + .getAttemptedItemsMonitor()) + .getMovementFinishedBlocksCount() >= expectedMoveFinishedBlks; } }, 100, timeout); } --------------------------------------------------------------------- To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org For additional commands, e-mail: common-commits-h...@hadoop.apache.org