Repository: hadoop Updated Branches: refs/heads/HDFS-10285 a27516211 -> 2666d51fb
http://git-wip-us.apache.org/repos/asf/hadoop/blob/2666d51f/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestStoragePolicySatisfyWorker.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestStoragePolicySatisfyWorker.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestStoragePolicySatisfyWorker.java index b84b1d2..3681cae 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestStoragePolicySatisfyWorker.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestStoragePolicySatisfyWorker.java @@ -17,7 +17,7 @@ */ package org.apache.hadoop.hdfs.server.datanode; -import static org.junit.Assert.*; +import static org.junit.Assert.assertTrue; import java.io.IOException; import java.net.InetSocketAddress; @@ -36,8 +36,6 @@ import org.apache.hadoop.hdfs.MiniDFSCluster; import org.apache.hadoop.hdfs.protocol.Block; import org.apache.hadoop.hdfs.protocol.DatanodeInfo; import org.apache.hadoop.hdfs.protocol.LocatedBlock; -import org.apache.hadoop.hdfs.server.namenode.INode; -import org.apache.hadoop.hdfs.server.protocol.BlocksStorageMovementResult; import org.apache.hadoop.hdfs.server.protocol.BlockStorageMovementCommand.BlockMovingInfo; import org.apache.hadoop.test.GenericTestUtils; import org.junit.After; @@ -180,11 +178,10 @@ public class TestStoragePolicySatisfyWorker { lb.getBlock().getLocalBlock(), lb.getLocations()[0], targetDnInfo, lb.getStorageTypes()[0], StorageType.ARCHIVE); blockMovingInfos.add(blockMovingInfo); - INode inode = cluster.getNamesystem().getFSDirectory().getINode(file); - worker.processBlockMovingTasks(inode.getId(), - cluster.getNamesystem().getBlockPoolId(), blockMovingInfos); + worker.processBlockMovingTasks(cluster.getNamesystem().getBlockPoolId(), + blockMovingInfos); - waitForBlockMovementCompletion(worker, inode.getId(), 1, 30000); + waitForBlockMovementCompletion(worker, 1, 30000); } finally { worker.stop(); } @@ -226,50 +223,42 @@ public class TestStoragePolicySatisfyWorker { locatedBlock.getStorageTypes()[0], StorageType.ARCHIVE); blockMovingInfos.add(blockMovingInfo); } - INode inode = cluster.getNamesystem().getFSDirectory().getINode(file); - worker.processBlockMovingTasks(inode.getId(), - cluster.getNamesystem().getBlockPoolId(), blockMovingInfos); + worker.processBlockMovingTasks(cluster.getNamesystem().getBlockPoolId(), + blockMovingInfos); // Wait till results queue build up - waitForBlockMovementResult(worker, inode.getId(), 30000); + waitForBlockMovementResult(worker, 30000); worker.dropSPSWork(); assertTrue(worker.getBlocksMovementsStatusHandler() - .getBlksMovementResults().size() == 0); + .getMoveAttemptFinishedBlocks().size() == 0); } finally { worker.stop(); } } private void waitForBlockMovementResult( - final StoragePolicySatisfyWorker worker, final long inodeId, int timeout) - throws Exception { + final StoragePolicySatisfyWorker worker, int timeout) throws Exception { GenericTestUtils.waitFor(new Supplier<Boolean>() { @Override public Boolean get() { - List<BlocksStorageMovementResult> completedBlocks = worker - .getBlocksMovementsStatusHandler().getBlksMovementResults(); + List<Block> completedBlocks = worker.getBlocksMovementsStatusHandler() + .getMoveAttemptFinishedBlocks(); return completedBlocks.size() > 0; } }, 100, timeout); } private void waitForBlockMovementCompletion( - final StoragePolicySatisfyWorker worker, final long inodeId, - int expectedFailedItemsCount, int timeout) throws Exception { + final StoragePolicySatisfyWorker worker, + int expectedFinishedItemsCount, int timeout) throws Exception { GenericTestUtils.waitFor(new Supplier<Boolean>() { @Override public Boolean get() { - List<BlocksStorageMovementResult> completedBlocks = worker - .getBlocksMovementsStatusHandler().getBlksMovementResults(); - int failedCount = 0; - for (BlocksStorageMovementResult blkMovementResult : completedBlocks) { - if (blkMovementResult.getStatus() == - BlocksStorageMovementResult.Status.FAILURE) { - failedCount++; - } - } + List<Block> completedBlocks = worker.getBlocksMovementsStatusHandler() + .getMoveAttemptFinishedBlocks(); + int finishedCount = completedBlocks.size(); LOG.info("Block movement completed count={}, expected={} and actual={}", - completedBlocks.size(), expectedFailedItemsCount, failedCount); - return expectedFailedItemsCount == failedCount; + completedBlocks.size(), expectedFinishedItemsCount, finishedCount); + return expectedFinishedItemsCount == finishedCount; } }, 100, timeout); } @@ -304,8 +293,7 @@ public class TestStoragePolicySatisfyWorker { private BlockMovingInfo prepareBlockMovingInfo(Block block, DatanodeInfo src, DatanodeInfo destin, StorageType storageType, StorageType targetStorageType) { - return new BlockMovingInfo(block, new DatanodeInfo[] {src}, - new DatanodeInfo[] {destin}, new StorageType[] {storageType}, - new StorageType[] {targetStorageType}); + return new BlockMovingInfo(block, src, destin, storageType, + targetStorageType); } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/2666d51f/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestStorageReport.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestStorageReport.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestStorageReport.java index df120ca..20402f2 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestStorageReport.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestStorageReport.java @@ -29,7 +29,7 @@ import org.apache.hadoop.hdfs.HdfsConfiguration; import org.apache.hadoop.hdfs.MiniDFSCluster; import org.apache.hadoop.hdfs.protocolPB.DatanodeProtocolClientSideTranslatorPB; import org.apache.hadoop.hdfs.server.namenode.NameNode; -import org.apache.hadoop.hdfs.server.protocol.BlocksStorageMovementResult; +import org.apache.hadoop.hdfs.server.protocol.BlocksStorageMoveAttemptFinished; import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration; import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage; import org.apache.hadoop.hdfs.server.protocol.SlowDiskReports; @@ -112,7 +112,7 @@ public class TestStorageReport { Mockito.any(VolumeFailureSummary.class), Mockito.anyBoolean(), Mockito.any(SlowPeerReports.class), Mockito.any(SlowDiskReports.class), - Mockito.any(BlocksStorageMovementResult[].class)); + Mockito.any(BlocksStorageMoveAttemptFinished.class)); StorageReport[] reports = captor.getValue(); http://git-wip-us.apache.org/repos/asf/hadoop/blob/2666d51f/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/NNThroughputBenchmark.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/NNThroughputBenchmark.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/NNThroughputBenchmark.java index 1e016f7..ec00ae7 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/NNThroughputBenchmark.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/NNThroughputBenchmark.java @@ -56,7 +56,7 @@ import org.apache.hadoop.hdfs.server.datanode.DataNode; import org.apache.hadoop.hdfs.server.datanode.DataStorage; import org.apache.hadoop.hdfs.server.protocol.BlockCommand; import org.apache.hadoop.hdfs.server.protocol.BlockReportContext; -import org.apache.hadoop.hdfs.server.protocol.BlocksStorageMovementResult; +import org.apache.hadoop.hdfs.server.protocol.BlocksStorageMoveAttemptFinished; import org.apache.hadoop.hdfs.server.protocol.DatanodeCommand; import org.apache.hadoop.hdfs.server.protocol.DatanodeProtocol; import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration; @@ -958,7 +958,7 @@ public class NNThroughputBenchmark implements Tool { DatanodeCommand[] cmds = dataNodeProto.sendHeartbeat(dnRegistration, rep, 0L, 0L, 0, 0, 0, null, true, SlowPeerReports.EMPTY_REPORT, SlowDiskReports.EMPTY_REPORT, - new BlocksStorageMovementResult[0]).getCommands(); + new BlocksStorageMoveAttemptFinished(null)).getCommands(); if(cmds != null) { for (DatanodeCommand cmd : cmds ) { if(LOG.isDebugEnabled()) { @@ -1009,7 +1009,7 @@ public class NNThroughputBenchmark implements Tool { DatanodeCommand[] cmds = dataNodeProto.sendHeartbeat(dnRegistration, rep, 0L, 0L, 0, 0, 0, null, true, SlowPeerReports.EMPTY_REPORT, SlowDiskReports.EMPTY_REPORT, - new BlocksStorageMovementResult[0]).getCommands(); + new BlocksStorageMoveAttemptFinished(null)).getCommands(); if (cmds != null) { for (DatanodeCommand cmd : cmds) { if (cmd.getAction() == DatanodeProtocol.DNA_TRANSFER) { http://git-wip-us.apache.org/repos/asf/hadoop/blob/2666d51f/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/NameNodeAdapter.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/NameNodeAdapter.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/NameNodeAdapter.java index ba29c82..b2b878d 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/NameNodeAdapter.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/NameNodeAdapter.java @@ -40,7 +40,7 @@ import org.apache.hadoop.hdfs.server.namenode.FSDirectory.DirOp; import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.MkdirOp; import org.apache.hadoop.hdfs.server.namenode.LeaseManager.Lease; import org.apache.hadoop.hdfs.server.namenode.ha.EditLogTailer; -import org.apache.hadoop.hdfs.server.protocol.BlocksStorageMovementResult; +import org.apache.hadoop.hdfs.server.protocol.BlocksStorageMoveAttemptFinished; import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration; import org.apache.hadoop.hdfs.server.protocol.HeartbeatResponse; import org.apache.hadoop.hdfs.server.protocol.NamenodeCommand; @@ -128,7 +128,7 @@ public class NameNodeAdapter { BlockManagerTestUtil.getStorageReportsForDatanode(dd), dd.getCacheCapacity(), dd.getCacheRemaining(), 0, 0, 0, null, true, SlowPeerReports.EMPTY_REPORT, SlowDiskReports.EMPTY_REPORT, - new BlocksStorageMovementResult[0]); + new BlocksStorageMoveAttemptFinished(null)); } public static boolean setReplication(final FSNamesystem ns, http://git-wip-us.apache.org/repos/asf/hadoop/blob/2666d51f/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestBlockStorageMovementAttemptedItems.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestBlockStorageMovementAttemptedItems.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestBlockStorageMovementAttemptedItems.java index 7918821..f79326f 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestBlockStorageMovementAttemptedItems.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestBlockStorageMovementAttemptedItems.java @@ -18,10 +18,17 @@ package org.apache.hadoop.hdfs.server.namenode; import static org.apache.hadoop.util.Time.monotonicNow; -import static org.junit.Assert.*; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; + +import java.util.ArrayList; +import java.util.List; + +import org.apache.hadoop.hdfs.protocol.Block; +import org.apache.hadoop.hdfs.server.namenode.StoragePolicySatisfier.AttemptedItemInfo; import org.apache.hadoop.hdfs.server.namenode.StoragePolicySatisfier.ItemInfo; -import org.apache.hadoop.hdfs.server.protocol.BlocksStorageMovementResult; import org.junit.After; import org.junit.Before; import org.junit.Test; @@ -42,9 +49,8 @@ public class TestBlockStorageMovementAttemptedItems { unsatisfiedStorageMovementFiles = new BlockStorageMovementNeeded( Mockito.mock(Namesystem.class), Mockito.mock(StoragePolicySatisfier.class), 100); - StoragePolicySatisfier sps = Mockito.mock(StoragePolicySatisfier.class); bsmAttemptedItems = new BlockStorageMovementAttemptedItems(100, - selfRetryTimeout, unsatisfiedStorageMovementFiles, sps); + selfRetryTimeout, unsatisfiedStorageMovementFiles); } @After @@ -76,120 +82,115 @@ public class TestBlockStorageMovementAttemptedItems { return isItemFound; } + /** + * Verify that moved blocks reporting should queued up the block info. + */ @Test(timeout = 30000) - public void testAddResultWithFailureResult() throws Exception { - bsmAttemptedItems.start(); // start block movement result monitor thread - Long item = new Long(1234); - bsmAttemptedItems.add(new ItemInfo(0L, item), true); - bsmAttemptedItems.addResults( - new BlocksStorageMovementResult[]{new BlocksStorageMovementResult( - item.longValue(), BlocksStorageMovementResult.Status.FAILURE)}); - assertTrue(checkItemMovedForRetry(item, 200)); - } - - @Test(timeout = 30000) - public void testAddResultWithSucessResult() throws Exception { + public void testAddReportedMoveAttemptFinishedBlocks() throws Exception { bsmAttemptedItems.start(); // start block movement result monitor thread Long item = new Long(1234); - bsmAttemptedItems.add(new ItemInfo(0L, item), true); - bsmAttemptedItems.addResults( - new BlocksStorageMovementResult[]{new BlocksStorageMovementResult( - item.longValue(), BlocksStorageMovementResult.Status.SUCCESS)}); - assertFalse(checkItemMovedForRetry(item, 200)); + List<Block> blocks = new ArrayList<Block>(); + blocks.add(new Block(item)); + bsmAttemptedItems.add(new AttemptedItemInfo(0L, 0L, 0L, blocks)); + Block[] blockArray = new Block[blocks.size()]; + blocks.toArray(blockArray); + bsmAttemptedItems.addReportedMovedBlocks(blockArray); + assertEquals("Failed to receive result!", 1, + bsmAttemptedItems.getMovementFinishedBlocksCount()); } + /** + * Verify empty moved blocks reporting queue. + */ @Test(timeout = 30000) - public void testNoResultAdded() throws Exception { - bsmAttemptedItems.start(); // start block movement result monitor thread + public void testNoBlockMovementAttemptFinishedReportAdded() throws Exception { + bsmAttemptedItems.start(); // start block movement report monitor thread Long item = new Long(1234); - bsmAttemptedItems.add(new ItemInfo(0L, item), true); - // After self retry timeout, it should be added back for retry - assertTrue("Failed to add to the retry list", - checkItemMovedForRetry(item, 600)); - assertEquals("Failed to remove from the attempted list", 0, + List<Block> blocks = new ArrayList<>(); + blocks.add(new Block(item)); + bsmAttemptedItems.add(new AttemptedItemInfo(0L, 0L, 0L, blocks)); + assertEquals("Shouldn't receive result", 0, + bsmAttemptedItems.getMovementFinishedBlocksCount()); + assertEquals("Item doesn't exist in the attempted list", 1, bsmAttemptedItems.getAttemptedItemsCount()); } /** - * Partial block movement with BlocksStorageMovementResult#SUCCESS. Here, - * first occurrence is #blockStorageMovementResultCheck() and then + * Partial block movement with + * BlockMovementStatus#DN_BLK_STORAGE_MOVEMENT_SUCCESS. Here, first occurrence + * is #blockStorageMovementReportedItemsCheck() and then * #blocksStorageMovementUnReportedItemsCheck(). */ @Test(timeout = 30000) public void testPartialBlockMovementShouldBeRetried1() throws Exception { Long item = new Long(1234); - bsmAttemptedItems.add(new ItemInfo(0L, item), false); - bsmAttemptedItems.addResults( - new BlocksStorageMovementResult[]{new BlocksStorageMovementResult( - item.longValue(), BlocksStorageMovementResult.Status.SUCCESS)}); - - // start block movement result monitor thread + List<Block> blocks = new ArrayList<>(); + blocks.add(new Block(item)); + blocks.add(new Block(5678L)); + Long trackID = 0L; + bsmAttemptedItems + .add(new AttemptedItemInfo(trackID, trackID, 0L, blocks)); + Block[] blksMovementReport = new Block[1]; + blksMovementReport[0] = new Block(item); + bsmAttemptedItems.addReportedMovedBlocks(blksMovementReport); + + // start block movement report monitor thread bsmAttemptedItems.start(); assertTrue("Failed to add to the retry list", - checkItemMovedForRetry(item, 5000)); + checkItemMovedForRetry(trackID, 5000)); assertEquals("Failed to remove from the attempted list", 0, bsmAttemptedItems.getAttemptedItemsCount()); } /** - * Partial block movement with BlocksStorageMovementResult#SUCCESS. Here, - * first occurrence is #blocksStorageMovementUnReportedItemsCheck() and then - * #blockStorageMovementResultCheck(). + * Partial block movement. Here, first occurrence is + * #blocksStorageMovementUnReportedItemsCheck() and then + * #blockStorageMovementReportedItemsCheck(). */ @Test(timeout = 30000) public void testPartialBlockMovementShouldBeRetried2() throws Exception { Long item = new Long(1234); - bsmAttemptedItems.add(new ItemInfo(0L, item), false); - bsmAttemptedItems.addResults( - new BlocksStorageMovementResult[]{new BlocksStorageMovementResult( - item.longValue(), BlocksStorageMovementResult.Status.SUCCESS)}); + Long trackID = 0L; + List<Block> blocks = new ArrayList<>(); + blocks.add(new Block(item)); + bsmAttemptedItems + .add(new AttemptedItemInfo(trackID, trackID, 0L, blocks)); + Block[] blksMovementReport = new Block[1]; + blksMovementReport[0] = new Block(item); + bsmAttemptedItems.addReportedMovedBlocks(blksMovementReport); Thread.sleep(selfRetryTimeout * 2); // Waiting to get timed out bsmAttemptedItems.blocksStorageMovementUnReportedItemsCheck(); - bsmAttemptedItems.blockStorageMovementResultCheck(); + bsmAttemptedItems.blockStorageMovementReportedItemsCheck(); assertTrue("Failed to add to the retry list", - checkItemMovedForRetry(item, 5000)); + checkItemMovedForRetry(trackID, 5000)); assertEquals("Failed to remove from the attempted list", 0, bsmAttemptedItems.getAttemptedItemsCount()); } /** - * Partial block movement with only BlocksStorageMovementResult#FAILURE - * result and storageMovementAttemptedItems list is empty. + * Partial block movement with only BlocksStorageMoveAttemptFinished report + * and storageMovementAttemptedItems list is empty. */ @Test(timeout = 30000) public void testPartialBlockMovementWithEmptyAttemptedQueue() throws Exception { Long item = new Long(1234); - bsmAttemptedItems.addResults( - new BlocksStorageMovementResult[]{new BlocksStorageMovementResult( - item, BlocksStorageMovementResult.Status.FAILURE)}); - bsmAttemptedItems.blockStorageMovementResultCheck(); + Long trackID = 0L; + List<Block> blocks = new ArrayList<>(); + blocks.add(new Block(item)); + bsmAttemptedItems + .add(new AttemptedItemInfo(trackID, trackID, 0L, blocks)); + Block[] blksMovementReport = new Block[1]; + blksMovementReport[0] = new Block(item); + bsmAttemptedItems.addReportedMovedBlocks(blksMovementReport); assertFalse( "Should not add in queue again if it is not there in" + " storageMovementAttemptedItems", - checkItemMovedForRetry(item, 5000)); - assertEquals("Failed to remove from the attempted list", 0, - bsmAttemptedItems.getAttemptedItemsCount()); - } - - /** - * Partial block movement with BlocksStorageMovementResult#FAILURE result and - * storageMovementAttemptedItems. - */ - @Test(timeout = 30000) - public void testPartialBlockMovementShouldBeRetried4() throws Exception { - Long item = new Long(1234); - bsmAttemptedItems.add(new ItemInfo(0L, item), false); - bsmAttemptedItems.addResults( - new BlocksStorageMovementResult[]{new BlocksStorageMovementResult( - item.longValue(), BlocksStorageMovementResult.Status.FAILURE)}); - bsmAttemptedItems.blockStorageMovementResultCheck(); - assertTrue("Failed to add to the retry list", - checkItemMovedForRetry(item, 5000)); - assertEquals("Failed to remove from the attempted list", 0, + checkItemMovedForRetry(trackID, 5000)); + assertEquals("Failed to remove from the attempted list", 1, bsmAttemptedItems.getAttemptedItemsCount()); } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/2666d51f/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestDeadDatanode.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestDeadDatanode.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestDeadDatanode.java index 87b8e79..2a8777d 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestDeadDatanode.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestDeadDatanode.java @@ -44,7 +44,7 @@ import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeStorageInfo; import org.apache.hadoop.hdfs.server.datanode.DataNode; import org.apache.hadoop.hdfs.server.datanode.InternalDataNodeTestUtils; import org.apache.hadoop.hdfs.server.protocol.BlockReportContext; -import org.apache.hadoop.hdfs.server.protocol.BlocksStorageMovementResult; +import org.apache.hadoop.hdfs.server.protocol.BlocksStorageMoveAttemptFinished; import org.apache.hadoop.hdfs.server.protocol.DatanodeCommand; import org.apache.hadoop.hdfs.server.protocol.DatanodeProtocol; import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration; @@ -141,7 +141,7 @@ public class TestDeadDatanode { DatanodeCommand[] cmd = dnp.sendHeartbeat(reg, rep, 0L, 0L, 0, 0, 0, null, true, SlowPeerReports.EMPTY_REPORT, SlowDiskReports.EMPTY_REPORT, - new BlocksStorageMovementResult[0]).getCommands(); + new BlocksStorageMoveAttemptFinished(null)).getCommands(); assertEquals(1, cmd.length); assertEquals(cmd[0].getAction(), RegisterCommand.REGISTER .getAction()); http://git-wip-us.apache.org/repos/asf/hadoop/blob/2666d51f/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestStoragePolicySatisfier.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestStoragePolicySatisfier.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestStoragePolicySatisfier.java index 57e9f94..70219f6 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestStoragePolicySatisfier.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestStoragePolicySatisfier.java @@ -203,11 +203,11 @@ public class TestStoragePolicySatisfier { } /** - * Tests to verify that the block storage movement results will be propagated + * Tests to verify that the block storage movement report will be propagated * to Namenode via datanode heartbeat. */ @Test(timeout = 300000) - public void testPerTrackIdBlocksStorageMovementResults() throws Exception { + public void testBlksStorageMovementAttemptFinishedReport() throws Exception { try { createCluster(); // Change policy to ONE_SSD @@ -229,7 +229,7 @@ public class TestStoragePolicySatisfier { DFSTestUtil.waitExpectedStorageType( file, StorageType.DISK, 2, 30000, dfs); - waitForBlocksMovementResult(1, 30000); + waitForBlocksMovementAttemptReport(1, 30000); } finally { shutdownCluster(); } @@ -276,7 +276,7 @@ public class TestStoragePolicySatisfier { fileName, StorageType.DISK, 2, 30000, dfs); } - waitForBlocksMovementResult(files.size(), 30000); + waitForBlocksMovementAttemptReport(files.size(), 30000); } finally { shutdownCluster(); } @@ -457,7 +457,7 @@ public class TestStoragePolicySatisfier { DFSTestUtil.waitExpectedStorageType( file, StorageType.DISK, 2, 30000, dfs); - waitForBlocksMovementResult(1, 30000); + waitForBlocksMovementAttemptReport(1, 30000); } finally { shutdownCluster(); } @@ -630,7 +630,7 @@ public class TestStoragePolicySatisfier { // No block movement will be scheduled as there is no target node // available with the required storage type. waitForAttemptedItems(1, 30000); - waitForBlocksMovementResult(1, 30000); + waitForBlocksMovementAttemptReport(1, 30000); DFSTestUtil.waitExpectedStorageType( file1, StorageType.ARCHIVE, 1, 30000, dfs); DFSTestUtil.waitExpectedStorageType( @@ -691,7 +691,7 @@ public class TestStoragePolicySatisfier { DFSTestUtil.waitExpectedStorageType( file, StorageType.DISK, 3, 30000, dfs); - waitForBlocksMovementResult(1, 30000); + waitForBlocksMovementAttemptReport(1, 30000); } finally { shutdownCluster(); } @@ -871,7 +871,7 @@ public class TestStoragePolicySatisfier { Set<DatanodeDescriptor> dns = hdfsCluster.getNamesystem() .getBlockManager().getDatanodeManager().getDatanodes(); for (DatanodeDescriptor dd : dns) { - assertNull(dd.getBlocksToMoveStorages()); + assertNull(dd.getBlocksToMoveStorages(1)); } // Enable heart beats now @@ -1224,7 +1224,7 @@ public class TestStoragePolicySatisfier { /** * Test SPS for batch processing. */ - @Test(timeout = 300000) + @Test(timeout = 3000000) public void testBatchProcessingForSPSDirectory() throws Exception { try { StorageType[][] diskTypes = new StorageType[][] { @@ -1252,7 +1252,7 @@ public class TestStoragePolicySatisfier { DFSTestUtil.waitExpectedStorageType(fileName, StorageType.ARCHIVE, 2, 30000, dfs); } - waitForBlocksMovementResult(files.size(), 30000); + waitForBlocksMovementAttemptReport(files.size(), 30000); String expectedLogMessage = "StorageMovementNeeded queue remaining" + " capacity is zero"; assertTrue("Log output does not contain expected log message: " @@ -1268,7 +1268,7 @@ public class TestStoragePolicySatisfier { * 1. Delete /root when traversing Q * 2. U, R, S should not be in queued. */ - @Test + @Test(timeout = 300000) public void testTraverseWhenParentDeleted() throws Exception { StorageType[][] diskTypes = new StorageType[][] { {StorageType.DISK, StorageType.ARCHIVE}, @@ -1330,7 +1330,7 @@ public class TestStoragePolicySatisfier { * 1. Delete L when traversing Q * 2. E, M, U, R, S should not be in queued. */ - @Test + @Test(timeout = 300000) public void testTraverseWhenRootParentDeleted() throws Exception { StorageType[][] diskTypes = new StorageType[][] { {StorageType.DISK, StorageType.ARCHIVE}, @@ -1387,6 +1387,82 @@ public class TestStoragePolicySatisfier { dfs.delete(new Path("/root"), true); } + /** + * Test storage move blocks while under replication block tasks exists in the + * system. So, both will share the max transfer streams. + * + * 1. Create cluster with 3 datanode. + * 2. Create 20 files with 2 replica. + * 3. Start 2 more DNs with DISK & SSD types + * 4. SetReplication factor for the 1st 10 files to 4 to trigger replica task + * 5. Set policy to SSD to the 2nd set of files from 11-20 + * 6. Call SPS for 11-20 files to trigger move block tasks to new DNs + * 7. Wait for the under replica and SPS tasks completion + */ + @Test(timeout = 300000) + public void testMoveBlocksWithUnderReplicatedBlocks() throws Exception { + try { + config.setInt(DFSConfigKeys.DFS_NAMENODE_REPLICATION_MAX_STREAMS_KEY, 3); + config.setLong("dfs.block.size", DEFAULT_BLOCK_SIZE); + config.setBoolean(DFSConfigKeys.DFS_STORAGE_POLICY_SATISFIER_ENABLED_KEY, + true); + config.set(DFSConfigKeys + .DFS_STORAGE_POLICY_SATISFIER_RECHECK_TIMEOUT_MILLIS_KEY, + "3000"); + config.setBoolean(DFSConfigKeys + .DFS_STORAGE_POLICY_SATISFIER_SHARE_EQUAL_REPLICA_MAX_STREAMS_KEY, + true); + + StorageType[][] storagetypes = new StorageType[][] { + {StorageType.ARCHIVE, StorageType.DISK}, + {StorageType.ARCHIVE, StorageType.DISK}}; + hdfsCluster = new MiniDFSCluster.Builder(config).numDataNodes(2) + .storageTypes(storagetypes).build(); + hdfsCluster.waitActive(); + dfs = hdfsCluster.getFileSystem(); + + // Below files will be used for pending replication block tasks. + for (int i=1; i<=20; i++){ + Path filePath = new Path("/file" + i); + DFSTestUtil.createFile(dfs, filePath, DEFAULT_BLOCK_SIZE * 5, (short) 2, + 0); + } + + StorageType[][] newtypes = + new StorageType[][]{{StorageType.DISK, StorageType.SSD}, + {StorageType.DISK, StorageType.SSD}}; + startAdditionalDNs(config, 2, numOfDatanodes, newtypes, + storagesPerDatanode, capacity, hdfsCluster); + + // increase replication factor to 4 for the first 10 files and thus + // initiate replica tasks + for (int i=1; i<=10; i++){ + Path filePath = new Path("/file" + i); + dfs.setReplication(filePath, (short) 4); + } + + // invoke SPS for 11-20 files + for (int i = 11; i <= 20; i++) { + Path filePath = new Path("/file" + i); + dfs.setStoragePolicy(filePath, "ALL_SSD"); + dfs.satisfyStoragePolicy(filePath); + } + + for (int i = 1; i <= 10; i++) { + Path filePath = new Path("/file" + i); + DFSTestUtil.waitExpectedStorageType(filePath.toString(), + StorageType.DISK, 4, 30000, hdfsCluster.getFileSystem()); + } + for (int i = 11; i <= 20; i++) { + Path filePath = new Path("/file" + i); + DFSTestUtil.waitExpectedStorageType(filePath.toString(), + StorageType.SSD, 2, 30000, hdfsCluster.getFileSystem()); + } + } finally { + shutdownCluster(); + } + } + private static void createDirectoryTree(DistributedFileSystem dfs) throws Exception { // tree structure @@ -1514,18 +1590,19 @@ public class TestStoragePolicySatisfier { }, 100, timeout); } - private void waitForBlocksMovementResult(long expectedBlkMovResultsCount, - int timeout) throws TimeoutException, InterruptedException { + private void waitForBlocksMovementAttemptReport( + long expectedMovementFinishedBlocksCount, int timeout) + throws TimeoutException, InterruptedException { BlockManager blockManager = hdfsCluster.getNamesystem().getBlockManager(); final StoragePolicySatisfier sps = blockManager.getStoragePolicySatisfier(); GenericTestUtils.waitFor(new Supplier<Boolean>() { @Override public Boolean get() { - LOG.info("expectedResultsCount={} actualResultsCount={}", - expectedBlkMovResultsCount, - sps.getAttemptedItemsMonitor().resultsCount()); - return sps.getAttemptedItemsMonitor() - .resultsCount() == expectedBlkMovResultsCount; + LOG.info("MovementFinishedBlocks: expectedCount={} actualCount={}", + expectedMovementFinishedBlocksCount, + sps.getAttemptedItemsMonitor().getMovementFinishedBlocksCount()); + return sps.getAttemptedItemsMonitor().getMovementFinishedBlocksCount() + >= expectedMovementFinishedBlocksCount; } }, 100, timeout); } http://git-wip-us.apache.org/repos/asf/hadoop/blob/2666d51f/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestStoragePolicySatisfierWithStripedFile.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestStoragePolicySatisfierWithStripedFile.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestStoragePolicySatisfierWithStripedFile.java index fc5d0a5..154ddae 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestStoragePolicySatisfierWithStripedFile.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestStoragePolicySatisfierWithStripedFile.java @@ -180,7 +180,7 @@ public class TestStoragePolicySatisfierWithStripedFile { LOG.info("Sets storage policy to COLD and invoked satisfyStoragePolicy"); cluster.triggerHeartbeats(); - waitForBlocksMovementResult(cluster, 1, 60000); + waitForBlocksMovementAttemptReport(cluster, 9, 60000); // verify storage types and locations waitExpectedStorageType(cluster, fooFile, fileLen, StorageType.ARCHIVE, 9, 9, 60000); @@ -290,7 +290,7 @@ public class TestStoragePolicySatisfierWithStripedFile { LOG.info("Sets storage policy to COLD and invoked satisfyStoragePolicy"); cluster.triggerHeartbeats(); - waitForBlocksMovementResult(cluster, 1, 60000); + waitForBlocksMovementAttemptReport(cluster, 5, 60000); waitForAttemptedItems(cluster, 1, 30000); // verify storage types and locations. waitExpectedStorageType(cluster, fooFile, fileLen, StorageType.ARCHIVE, 5, @@ -556,10 +556,10 @@ public class TestStoragePolicySatisfierWithStripedFile { }, 100, timeout); } - // Check whether the block movement result has been arrived at the + // Check whether the block movement attempt report has been arrived at the // Namenode(SPS). - private void waitForBlocksMovementResult(MiniDFSCluster cluster, - long expectedBlkMovResultsCount, int timeout) + private void waitForBlocksMovementAttemptReport(MiniDFSCluster cluster, + long expectedMovementFinishedBlocksCount, int timeout) throws TimeoutException, InterruptedException { BlockManager blockManager = cluster.getNamesystem().getBlockManager(); final StoragePolicySatisfier sps = blockManager.getStoragePolicySatisfier(); @@ -568,11 +568,11 @@ public class TestStoragePolicySatisfierWithStripedFile { GenericTestUtils.waitFor(new Supplier<Boolean>() { @Override public Boolean get() { - LOG.info("expectedResultsCount={} actualResultsCount={}", - expectedBlkMovResultsCount, - sps.getAttemptedItemsMonitor().resultsCount()); - return sps.getAttemptedItemsMonitor() - .resultsCount() == expectedBlkMovResultsCount; + LOG.info("MovementFinishedBlocks: expectedCount={} actualCount={}", + expectedMovementFinishedBlocksCount, + sps.getAttemptedItemsMonitor().getMovementFinishedBlocksCount()); + return sps.getAttemptedItemsMonitor().getMovementFinishedBlocksCount() + >= expectedMovementFinishedBlocksCount; } }, 100, timeout); } --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
