http://git-wip-us.apache.org/repos/asf/hadoop/blob/24bcc8c3/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/sps/TestStoragePolicySatisfier.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/sps/TestStoragePolicySatisfier.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/sps/TestStoragePolicySatisfier.java index b05717a..ec5307b 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/sps/TestStoragePolicySatisfier.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/sps/TestStoragePolicySatisfier.java @@ -108,8 +108,6 @@ public class TestStoragePolicySatisfier { public static final long CAPACITY = 2 * 256 * 1024 * 1024; public static final String FILE = "/testMoveToSatisfyStoragePolicy"; public static final int DEFAULT_BLOCK_SIZE = 1024; - private ExternalBlockMovementListener blkMoveListener = - new ExternalBlockMovementListener(); /** * Sets hdfs cluster. @@ -1282,8 +1280,8 @@ public class TestStoragePolicySatisfier { //Queue limit can control the traverse logic to wait for some free //entry in queue. After 10 files, traverse control will be on U. - StoragePolicySatisfier<Long> sps = new StoragePolicySatisfier<Long>(config); - Context<Long> ctxt = new IntraSPSNameNodeContext( + StoragePolicySatisfier sps = new StoragePolicySatisfier(config); + Context ctxt = new IntraSPSNameNodeContext( hdfsCluster.getNamesystem(), hdfsCluster.getNamesystem().getBlockManager(), sps) { @Override @@ -1297,8 +1295,7 @@ public class TestStoragePolicySatisfier { } }; - FileCollector<Long> fileIDCollector = createFileIdCollector(sps, ctxt); - sps.init(ctxt, fileIDCollector, null, null); + sps.init(ctxt); sps.getStorageMovementQueue().activate(); INode rootINode = fsDir.getINode("/root"); @@ -1314,13 +1311,6 @@ public class TestStoragePolicySatisfier { dfs.delete(new Path("/root"), true); } - public FileCollector<Long> createFileIdCollector( - StoragePolicySatisfier<Long> sps, Context<Long> ctxt) { - FileCollector<Long> fileIDCollector = new IntraSPSNameNodeFileIdCollector( - hdfsCluster.getNamesystem().getFSDirectory(), sps); - return fileIDCollector; - } - /** * Test traverse when root parent got deleted. * 1. Delete L when traversing Q @@ -1351,8 +1341,8 @@ public class TestStoragePolicySatisfier { // Queue limit can control the traverse logic to wait for some free // entry in queue. After 10 files, traverse control will be on U. - StoragePolicySatisfier<Long> sps = new StoragePolicySatisfier<Long>(config); - Context<Long> ctxt = new IntraSPSNameNodeContext( + StoragePolicySatisfier sps = new StoragePolicySatisfier(config); + Context ctxt = new IntraSPSNameNodeContext( hdfsCluster.getNamesystem(), hdfsCluster.getNamesystem().getBlockManager(), sps) { @Override @@ -1365,8 +1355,7 @@ public class TestStoragePolicySatisfier { return true; } }; - FileCollector<Long> fileIDCollector = createFileIdCollector(sps, ctxt); - sps.init(ctxt, fileIDCollector, null, null); + sps.init(ctxt); sps.getStorageMovementQueue().activate(); INode rootINode = fsDir.getINode("/root"); @@ -1383,12 +1372,12 @@ public class TestStoragePolicySatisfier { } private void assertTraversal(List<String> expectedTraverseOrder, - FSDirectory fsDir, StoragePolicySatisfier<Long> sps) + FSDirectory fsDir, StoragePolicySatisfier sps) throws InterruptedException { // Remove 10 element and make queue free, So other traversing will start. for (int i = 0; i < 10; i++) { String path = expectedTraverseOrder.remove(0); - ItemInfo<Long> itemInfo = sps.getStorageMovementQueue().get(); + ItemInfo itemInfo = sps.getStorageMovementQueue().get(); if (itemInfo == null) { continue; } @@ -1403,7 +1392,7 @@ public class TestStoragePolicySatisfier { // Check other element traversed in order and E, M, U, R, S should not be // added in queue which we already removed from expected list for (String path : expectedTraverseOrder) { - ItemInfo<Long> itemInfo = sps.getStorageMovementQueue().get(); + ItemInfo itemInfo = sps.getStorageMovementQueue().get(); if (itemInfo == null) { continue; } @@ -1717,17 +1706,17 @@ public class TestStoragePolicySatisfier { public void waitForAttemptedItems(long expectedBlkMovAttemptedCount, int timeout) throws TimeoutException, InterruptedException { BlockManager blockManager = hdfsCluster.getNamesystem().getBlockManager(); - final StoragePolicySatisfier<Long> sps = - (StoragePolicySatisfier<Long>) blockManager.getSPSManager() + final StoragePolicySatisfier sps = + (StoragePolicySatisfier) blockManager.getSPSManager() .getInternalSPSService(); GenericTestUtils.waitFor(new Supplier<Boolean>() { @Override public Boolean get() { LOG.info("expectedAttemptedItemsCount={} actualAttemptedItemsCount={}", expectedBlkMovAttemptedCount, - ((BlockStorageMovementAttemptedItems<Long>) (sps + ((BlockStorageMovementAttemptedItems) (sps .getAttemptedItemsMonitor())).getAttemptedItemsCount()); - return ((BlockStorageMovementAttemptedItems<Long>) (sps + return ((BlockStorageMovementAttemptedItems) (sps .getAttemptedItemsMonitor())) .getAttemptedItemsCount() == expectedBlkMovAttemptedCount; } @@ -1737,15 +1726,17 @@ public class TestStoragePolicySatisfier { public void waitForBlocksMovementAttemptReport( long expectedMovementFinishedBlocksCount, int timeout) throws TimeoutException, InterruptedException { - Assert.assertNotNull("Didn't set external block move listener", - blkMoveListener); + BlockManager blockManager = hdfsCluster.getNamesystem().getBlockManager(); + final StoragePolicySatisfier sps = + (StoragePolicySatisfier) blockManager.getSPSManager() + .getInternalSPSService(); GenericTestUtils.waitFor(new Supplier<Boolean>() { @Override public Boolean get() { - int actualCount = blkMoveListener.getActualBlockMovements().size(); + int actualCount = ((BlockStorageMovementAttemptedItems) (sps + .getAttemptedItemsMonitor())).getAttemptedItemsCount(); LOG.info("MovementFinishedBlocks: expectedCount={} actualCount={}", - expectedMovementFinishedBlocksCount, - actualCount); + expectedMovementFinishedBlocksCount, actualCount); return actualCount >= expectedMovementFinishedBlocksCount; } @@ -1798,29 +1789,12 @@ public class TestStoragePolicySatisfier { .numDataNodes(numberOfDatanodes).storagesPerDatanode(storagesPerDn) .storageTypes(storageTypes).storageCapacities(capacities).build(); cluster.waitActive(); - - // Sets external listener for assertion. - blkMoveListener.clear(); - BlockManager blockManager = cluster.getNamesystem().getBlockManager(); - final StoragePolicySatisfier<Long> sps = - (StoragePolicySatisfier<Long>) blockManager - .getSPSManager().getInternalSPSService(); - sps.setBlockMovementListener(blkMoveListener); return cluster; } public void restartNamenode() throws IOException { hdfsCluster.restartNameNodes(); hdfsCluster.waitActive(); - BlockManager blockManager = hdfsCluster.getNamesystem().getBlockManager(); - StoragePolicySatisfyManager spsMgr = blockManager.getSPSManager(); - if (spsMgr != null && spsMgr.isInternalSatisfierRunning()) { - // Sets external listener for assertion. - blkMoveListener.clear(); - final StoragePolicySatisfier<Long> sps = - (StoragePolicySatisfier<Long>) spsMgr.getInternalSPSService(); - sps.setBlockMovementListener(blkMoveListener); - } } /**
http://git-wip-us.apache.org/repos/asf/hadoop/blob/24bcc8c3/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/sps/TestStoragePolicySatisfierWithStripedFile.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/sps/TestStoragePolicySatisfierWithStripedFile.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/sps/TestStoragePolicySatisfierWithStripedFile.java index 857bd6c..8a25a5e 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/sps/TestStoragePolicySatisfierWithStripedFile.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/sps/TestStoragePolicySatisfierWithStripedFile.java @@ -43,7 +43,6 @@ import org.apache.hadoop.hdfs.protocol.LocatedBlock; import org.apache.hadoop.hdfs.protocol.LocatedBlocks; import org.apache.hadoop.hdfs.protocol.HdfsConstants.StoragePolicySatisfierMode; import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager; -import org.apache.hadoop.hdfs.server.namenode.sps.TestStoragePolicySatisfier.ExternalBlockMovementListener; import org.apache.hadoop.test.GenericTestUtils; import org.junit.Assert; import org.junit.Before; @@ -71,8 +70,6 @@ public class TestStoragePolicySatisfierWithStripedFile { private int cellSize; private int defaultStripeBlockSize; private Configuration conf; - private ExternalBlockMovementListener blkMoveListener = - new ExternalBlockMovementListener(); private ErasureCodingPolicy getEcPolicy() { return StripedFileTestUtil.getDefaultECPolicy(); @@ -94,6 +91,8 @@ public class TestStoragePolicySatisfierWithStripedFile { // Reduced refresh cycle to update latest datanodes. conf.setLong(DFSConfigKeys.DFS_SPS_DATANODE_CACHE_REFRESH_INTERVAL_MS, 1000); + conf.setInt( + DFSConfigKeys.DFS_STORAGE_POLICY_SATISFIER_MAX_RETRY_ATTEMPTS_KEY, 30); initConfWithStripe(conf, defaultStripeBlockSize); } @@ -135,14 +134,6 @@ public class TestStoragePolicySatisfierWithStripedFile { try { cluster.waitActive(); - // Sets external listener for assertion. - blkMoveListener.clear(); - BlockManager blockManager = cluster.getNamesystem().getBlockManager(); - final StoragePolicySatisfier<Long> sps = - (StoragePolicySatisfier<Long>) blockManager - .getSPSManager().getInternalSPSService(); - sps.setBlockMovementListener(blkMoveListener); - DistributedFileSystem dfs = cluster.getFileSystem(); dfs.enableErasureCodingPolicy( StripedFileTestUtil.getDefaultECPolicy().getName()); @@ -253,14 +244,6 @@ public class TestStoragePolicySatisfierWithStripedFile { try { cluster.waitActive(); - // Sets external listener for assertion. - blkMoveListener.clear(); - BlockManager blockManager = cluster.getNamesystem().getBlockManager(); - final StoragePolicySatisfier<Long> sps = - (StoragePolicySatisfier<Long>) blockManager - .getSPSManager().getInternalSPSService(); - sps.setBlockMovementListener(blkMoveListener); - DistributedFileSystem dfs = cluster.getFileSystem(); dfs.enableErasureCodingPolicy( StripedFileTestUtil.getDefaultECPolicy().getName()); @@ -400,10 +383,11 @@ public class TestStoragePolicySatisfierWithStripedFile { fs.satisfyStoragePolicy(fooFile); DFSTestUtil.waitExpectedStorageType(fooFile.toString(), StorageType.ARCHIVE, 5, 30000, cluster.getFileSystem()); - //Start reaming datanodes + //Start remaining datanodes for (int i = numOfDatanodes - 1; i >= 5; i--) { cluster.restartDataNode(list.get(i), false); } + cluster.waitActive(); // verify storage types and locations. waitExpectedStorageType(cluster, fooFile.toString(), fileLen, StorageType.ARCHIVE, 9, 9, 60000); @@ -511,17 +495,17 @@ public class TestStoragePolicySatisfierWithStripedFile { long expectedBlkMovAttemptedCount, int timeout) throws TimeoutException, InterruptedException { BlockManager blockManager = cluster.getNamesystem().getBlockManager(); - final StoragePolicySatisfier<Long> sps = - (StoragePolicySatisfier<Long>) blockManager + final StoragePolicySatisfier sps = + (StoragePolicySatisfier) blockManager .getSPSManager().getInternalSPSService(); GenericTestUtils.waitFor(new Supplier<Boolean>() { @Override public Boolean get() { LOG.info("expectedAttemptedItemsCount={} actualAttemptedItemsCount={}", expectedBlkMovAttemptedCount, - ((BlockStorageMovementAttemptedItems<Long>) sps + ((BlockStorageMovementAttemptedItems) sps .getAttemptedItemsMonitor()).getAttemptedItemsCount()); - return ((BlockStorageMovementAttemptedItems<Long>) sps + return ((BlockStorageMovementAttemptedItems) sps .getAttemptedItemsMonitor()) .getAttemptedItemsCount() == expectedBlkMovAttemptedCount; } @@ -583,12 +567,15 @@ public class TestStoragePolicySatisfierWithStripedFile { private void waitForBlocksMovementAttemptReport(MiniDFSCluster cluster, long expectedMoveFinishedBlks, int timeout) throws TimeoutException, InterruptedException { - Assert.assertNotNull("Didn't set external block move listener", - blkMoveListener); + BlockManager blockManager = cluster.getNamesystem().getBlockManager(); + final StoragePolicySatisfier sps = + (StoragePolicySatisfier) blockManager.getSPSManager() + .getInternalSPSService(); GenericTestUtils.waitFor(new Supplier<Boolean>() { @Override public Boolean get() { - int actualCount = blkMoveListener.getActualBlockMovements().size(); + int actualCount = ((BlockStorageMovementAttemptedItems) (sps + .getAttemptedItemsMonitor())).getMovementFinishedBlocksCount(); LOG.info("MovementFinishedBlocks: expectedCount={} actualCount={}", expectedMoveFinishedBlks, actualCount); http://git-wip-us.apache.org/repos/asf/hadoop/blob/24bcc8c3/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/sps/TestExternalStoragePolicySatisfier.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/sps/TestExternalStoragePolicySatisfier.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/sps/TestExternalStoragePolicySatisfier.java index be243cb..18acb50 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/sps/TestExternalStoragePolicySatisfier.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/sps/TestExternalStoragePolicySatisfier.java @@ -88,10 +88,8 @@ public class TestExternalStoragePolicySatisfier private String principal; private MiniKdc kdc; private File baseDir; - private StoragePolicySatisfier<String> externalSps; + private StoragePolicySatisfier externalSps; private ExternalSPSContext externalCtxt; - private ExternalBlockMovementListener blkMoveListener = - new ExternalBlockMovementListener(); @After public void destroy() throws Exception { @@ -143,16 +141,10 @@ public class TestExternalStoragePolicySatisfier nnc = getNameNodeConnector(getConf()); - externalSps = new StoragePolicySatisfier<String>(getConf()); + externalSps = new StoragePolicySatisfier(getConf()); externalCtxt = new ExternalSPSContext(externalSps, nnc); - blkMoveListener.clear(); - ExternalSPSBlockMoveTaskHandler externalHandler = - new ExternalSPSBlockMoveTaskHandler(conf, nnc, - externalSps); - externalSps.init(externalCtxt, - new ExternalSPSFilePathCollector(externalSps), externalHandler, - blkMoveListener); + externalSps.init(externalCtxt); externalSps.start(true, StoragePolicySatisfierMode.EXTERNAL); return cluster; } @@ -164,16 +156,10 @@ public class TestExternalStoragePolicySatisfier getCluster().restartNameNodes(); getCluster().waitActive(); - externalSps = new StoragePolicySatisfier<>(getConf()); + externalSps = new StoragePolicySatisfier(getConf()); externalCtxt = new ExternalSPSContext(externalSps, nnc); - blkMoveListener.clear(); - ExternalSPSBlockMoveTaskHandler externalHandler = - new ExternalSPSBlockMoveTaskHandler(getConf(), nnc, - externalSps); - externalSps.init(externalCtxt, - new ExternalSPSFilePathCollector(externalSps), externalHandler, - blkMoveListener); + externalSps.init(externalCtxt); externalSps.start(true, StoragePolicySatisfierMode.EXTERNAL); } @@ -206,11 +192,11 @@ public class TestExternalStoragePolicySatisfier public Boolean get() { LOG.info("expectedAttemptedItemsCount={} actualAttemptedItemsCount={}", expectedBlkMovAttemptedCount, - ((BlockStorageMovementAttemptedItems<String>) (externalSps + ((BlockStorageMovementAttemptedItems) (externalSps .getAttemptedItemsMonitor())).getAttemptedItemsCount()); - return ((BlockStorageMovementAttemptedItems<String>) (externalSps + return ((BlockStorageMovementAttemptedItems) (externalSps .getAttemptedItemsMonitor())) - .getAttemptedItemsCount() == expectedBlkMovAttemptedCount; + .getAttemptedItemsCount() == expectedBlkMovAttemptedCount; } }, 100, timeout); } @@ -218,12 +204,11 @@ public class TestExternalStoragePolicySatisfier public void waitForBlocksMovementAttemptReport( long expectedMovementFinishedBlocksCount, int timeout) throws TimeoutException, InterruptedException { - Assert.assertNotNull("Didn't set external block move listener", - blkMoveListener); GenericTestUtils.waitFor(new Supplier<Boolean>() { @Override public Boolean get() { - int actualCount = blkMoveListener.getActualBlockMovements().size(); + int actualCount = externalSps.getAttemptedItemsMonitor() + .getAttemptedItemsCount(); LOG.info("MovementFinishedBlocks: expectedCount={} actualCount={}", expectedMovementFinishedBlocksCount, actualCount); return actualCount --------------------------------------------------------------------- To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org For additional commands, e-mail: common-commits-h...@hadoop.apache.org