HDFS-11293: [SPS]: Local DN should be given preference as source node, when target available in same node. Contributed by Yuanbo Liu and Uma Maheswara Rao G
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/29ff3425 Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/29ff3425 Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/29ff3425 Branch: refs/heads/HDFS-10285 Commit: 29ff34252ba04e0676d4e39bd6a84aafaf72a088 Parents: 67ebe8d Author: Uma Maheswara Rao G <uma.ganguma...@intel.com> Authored: Mon Jan 9 14:37:42 2017 -0800 Committer: Rakesh Radhakrishnan <rake...@apache.org> Committed: Tue Jul 11 18:24:22 2017 +0530 ---------------------------------------------------------------------- .../server/namenode/StoragePolicySatisfier.java | 49 ++++++++++++-- .../namenode/TestStoragePolicySatisfier.java | 71 ++++++++++++++++++++ 2 files changed, 113 insertions(+), 7 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hadoop/blob/29ff3425/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/StoragePolicySatisfier.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/StoragePolicySatisfier.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/StoragePolicySatisfier.java index ee59617..b1b1464 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/StoragePolicySatisfier.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/StoragePolicySatisfier.java @@ -298,9 +298,25 @@ public class StoragePolicySatisfier implements Runnable { new ArrayList<StorageTypeNodePair>(); List<DatanodeStorageInfo> existingBlockStorages = new ArrayList<DatanodeStorageInfo>(Arrays.asList(storages)); + // if expected type exists in source node already, local movement would be + // possible, so lets find such sources first. + Iterator<DatanodeStorageInfo> iterator = existingBlockStorages.iterator(); + while (iterator.hasNext()) { + DatanodeStorageInfo datanodeStorageInfo = iterator.next(); + if (checkSourceAndTargetTypeExists( + datanodeStorageInfo.getDatanodeDescriptor(), existing, + expectedStorageTypes)) { + sourceWithStorageMap + .add(new StorageTypeNodePair(datanodeStorageInfo.getStorageType(), + datanodeStorageInfo.getDatanodeDescriptor())); + iterator.remove(); + existing.remove(datanodeStorageInfo.getStorageType()); + } + } + + // Let's find sources for existing types left. for (StorageType existingType : existing) { - Iterator<DatanodeStorageInfo> iterator = - existingBlockStorages.iterator(); + iterator = existingBlockStorages.iterator(); while (iterator.hasNext()) { DatanodeStorageInfo datanodeStorageInfo = iterator.next(); StorageType storageType = datanodeStorageInfo.getStorageType(); @@ -317,7 +333,7 @@ public class StoragePolicySatisfier implements Runnable { findTargetsForExpectedStorageTypes(expectedStorageTypes); foundMatchingTargetNodesForBlock |= findSourceAndTargetToMove( - blockMovingInfos, blockInfo, existing, sourceWithStorageMap, + blockMovingInfos, blockInfo, sourceWithStorageMap, expectedStorageTypes, locsForExpectedStorageTypes); } return foundMatchingTargetNodesForBlock; @@ -366,8 +382,6 @@ public class StoragePolicySatisfier implements Runnable { * - list of block source and target node pair * @param blockInfo * - Block - * @param existing - * - Existing storage types of block * @param sourceWithStorageList * - Source Datanode with storages list * @param expected @@ -379,7 +393,6 @@ public class StoragePolicySatisfier implements Runnable { */ private boolean findSourceAndTargetToMove( List<BlockMovingInfo> blockMovingInfos, BlockInfo blockInfo, - List<StorageType> existing, List<StorageTypeNodePair> sourceWithStorageList, List<StorageType> expected, StorageTypeNodeMap locsForExpectedStorageTypes) { @@ -403,6 +416,7 @@ public class StoragePolicySatisfier implements Runnable { targetNodes.add(chosenTarget.dn); targetStorageTypes.add(chosenTarget.storageType); chosenNodes.add(chosenTarget.dn); + expected.remove(chosenTarget.storageType); // TODO: We can increment scheduled block count for this node? } } @@ -442,16 +456,20 @@ public class StoragePolicySatisfier implements Runnable { targetNodes.add(chosenTarget.dn); targetStorageTypes.add(chosenTarget.storageType); chosenNodes.add(chosenTarget.dn); + expected.remove(chosenTarget.storageType); // TODO: We can increment scheduled block count for this node? } else { LOG.warn( "Failed to choose target datanode for the required" + " storage types {}, block:{}, existing storage type:{}", expected, blockInfo, existingTypeNodePair.storageType); - foundMatchingTargetNodesForBlock = false; } } + if (expected.size() > 0) { + foundMatchingTargetNodesForBlock = false; + } + blockMovingInfos.addAll(getBlockMovingInfos(blockInfo, sourceNodes, sourceStorageTypes, targetNodes, targetStorageTypes)); return foundMatchingTargetNodesForBlock; @@ -616,6 +634,23 @@ public class StoragePolicySatisfier implements Runnable { return max; } + private boolean checkSourceAndTargetTypeExists(DatanodeDescriptor dn, + List<StorageType> existing, List<StorageType> expectedStorageTypes) { + DatanodeStorageInfo[] allDNStorageInfos = dn.getStorageInfos(); + boolean isExpectedTypeAvailable = false; + boolean isExistingTypeAvailable = false; + for (DatanodeStorageInfo dnInfo : allDNStorageInfos) { + StorageType storageType = dnInfo.getStorageType(); + if (existing.contains(storageType)) { + isExistingTypeAvailable = true; + } + if (expectedStorageTypes.contains(storageType)) { + isExpectedTypeAvailable = true; + } + } + return isExistingTypeAvailable && isExpectedTypeAvailable; + } + private static class StorageTypeNodeMap { private final EnumMap<StorageType, List<DatanodeDescriptor>> typeNodeMap = new EnumMap<StorageType, List<DatanodeDescriptor>>(StorageType.class); http://git-wip-us.apache.org/repos/asf/hadoop/blob/29ff3425/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestStoragePolicySatisfier.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestStoragePolicySatisfier.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestStoragePolicySatisfier.java index 718dbcb..9abb78d 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestStoragePolicySatisfier.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestStoragePolicySatisfier.java @@ -580,6 +580,77 @@ public class TestStoragePolicySatisfier { } } + /** + * Tests that moving block storage with in the same datanode. Let's say we + * have DN1[DISK,ARCHIVE], DN2[DISK, SSD], DN3[DISK,RAM_DISK] when + * storagepolicy set to ONE_SSD and request satisfyStoragePolicy, then block + * should move to DN2[SSD] successfully. + */ + @Test(timeout = 300000) + public void testBlockMoveInSameDatanodeWithONESSD() throws Exception { + StorageType[][] diskTypes = + new StorageType[][]{{StorageType.DISK, StorageType.ARCHIVE}, + {StorageType.DISK, StorageType.SSD}, + {StorageType.DISK, StorageType.RAM_DISK}}; + config.setLong("dfs.block.size", DEFAULT_BLOCK_SIZE); + try { + hdfsCluster = startCluster(config, diskTypes, numOfDatanodes, + storagesPerDatanode, capacity); + dfs = hdfsCluster.getFileSystem(); + writeContent(file); + + // Change policy to ONE_SSD + dfs.setStoragePolicy(new Path(file), "ONE_SSD"); + FSNamesystem namesystem = hdfsCluster.getNamesystem(); + INode inode = namesystem.getFSDirectory().getINode(file); + + namesystem.getBlockManager().satisfyStoragePolicy(inode.getId()); + hdfsCluster.triggerHeartbeats(); + waitExpectedStorageType(file, StorageType.SSD, 1, 30000); + waitExpectedStorageType(file, StorageType.DISK, 2, 30000); + + } finally { + shutdownCluster(); + } + } + + /** + * Tests that moving block storage with in the same datanode and remote node. + * Let's say we have DN1[DISK,ARCHIVE], DN2[ARCHIVE, SSD], DN3[DISK,DISK], + * DN4[DISK,DISK] when storagepolicy set to WARM and request + * satisfyStoragePolicy, then block should move to DN1[ARCHIVE] and + * DN2[ARCHIVE] successfully. + */ + @Test(timeout = 300000) + public void testBlockMoveInSameAndRemoteDatanodesWithWARM() throws Exception { + StorageType[][] diskTypes = + new StorageType[][]{{StorageType.DISK, StorageType.ARCHIVE}, + {StorageType.ARCHIVE, StorageType.SSD}, + {StorageType.DISK, StorageType.DISK}, + {StorageType.DISK, StorageType.DISK}}; + + config.setLong("dfs.block.size", DEFAULT_BLOCK_SIZE); + try { + hdfsCluster = startCluster(config, diskTypes, diskTypes.length, + storagesPerDatanode, capacity); + dfs = hdfsCluster.getFileSystem(); + writeContent(file); + + // Change policy to WARM + dfs.setStoragePolicy(new Path(file), "WARM"); + FSNamesystem namesystem = hdfsCluster.getNamesystem(); + INode inode = namesystem.getFSDirectory().getINode(file); + + namesystem.getBlockManager().satisfyStoragePolicy(inode.getId()); + hdfsCluster.triggerHeartbeats(); + + waitExpectedStorageType(file, StorageType.DISK, 1, 30000); + waitExpectedStorageType(file, StorageType.ARCHIVE, 2, 30000); + } finally { + shutdownCluster(); + } + } + private String createFileAndSimulateFavoredNodes(int favoredNodesCount) throws IOException { ArrayList<DataNode> dns = hdfsCluster.getDataNodes(); --------------------------------------------------------------------- To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org For additional commands, e-mail: common-commits-h...@hadoop.apache.org