HDFS-11293: [SPS]: Local DN should be given preference as source node, when 
target available in same node. Contributed by Yuanbo Liu and Uma Maheswara Rao G


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/29ff3425
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/29ff3425
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/29ff3425

Branch: refs/heads/HDFS-10285
Commit: 29ff34252ba04e0676d4e39bd6a84aafaf72a088
Parents: 67ebe8d
Author: Uma Maheswara Rao G <uma.ganguma...@intel.com>
Authored: Mon Jan 9 14:37:42 2017 -0800
Committer: Rakesh Radhakrishnan <rake...@apache.org>
Committed: Tue Jul 11 18:24:22 2017 +0530

----------------------------------------------------------------------
 .../server/namenode/StoragePolicySatisfier.java | 49 ++++++++++++--
 .../namenode/TestStoragePolicySatisfier.java    | 71 ++++++++++++++++++++
 2 files changed, 113 insertions(+), 7 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/29ff3425/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/StoragePolicySatisfier.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/StoragePolicySatisfier.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/StoragePolicySatisfier.java
index ee59617..b1b1464 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/StoragePolicySatisfier.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/StoragePolicySatisfier.java
@@ -298,9 +298,25 @@ public class StoragePolicySatisfier implements Runnable {
           new ArrayList<StorageTypeNodePair>();
       List<DatanodeStorageInfo> existingBlockStorages =
           new ArrayList<DatanodeStorageInfo>(Arrays.asList(storages));
+      // if expected type exists in source node already, local movement would 
be
+      // possible, so lets find such sources first.
+      Iterator<DatanodeStorageInfo> iterator = 
existingBlockStorages.iterator();
+      while (iterator.hasNext()) {
+        DatanodeStorageInfo datanodeStorageInfo = iterator.next();
+        if (checkSourceAndTargetTypeExists(
+            datanodeStorageInfo.getDatanodeDescriptor(), existing,
+            expectedStorageTypes)) {
+          sourceWithStorageMap
+              .add(new 
StorageTypeNodePair(datanodeStorageInfo.getStorageType(),
+                  datanodeStorageInfo.getDatanodeDescriptor()));
+          iterator.remove();
+          existing.remove(datanodeStorageInfo.getStorageType());
+        }
+      }
+
+      // Let's find sources for existing types left.
       for (StorageType existingType : existing) {
-        Iterator<DatanodeStorageInfo> iterator =
-            existingBlockStorages.iterator();
+        iterator = existingBlockStorages.iterator();
         while (iterator.hasNext()) {
           DatanodeStorageInfo datanodeStorageInfo = iterator.next();
           StorageType storageType = datanodeStorageInfo.getStorageType();
@@ -317,7 +333,7 @@ public class StoragePolicySatisfier implements Runnable {
           findTargetsForExpectedStorageTypes(expectedStorageTypes);
 
       foundMatchingTargetNodesForBlock |= findSourceAndTargetToMove(
-          blockMovingInfos, blockInfo, existing, sourceWithStorageMap,
+          blockMovingInfos, blockInfo, sourceWithStorageMap,
           expectedStorageTypes, locsForExpectedStorageTypes);
     }
     return foundMatchingTargetNodesForBlock;
@@ -366,8 +382,6 @@ public class StoragePolicySatisfier implements Runnable {
    *          - list of block source and target node pair
    * @param blockInfo
    *          - Block
-   * @param existing
-   *          - Existing storage types of block
    * @param sourceWithStorageList
    *          - Source Datanode with storages list
    * @param expected
@@ -379,7 +393,6 @@ public class StoragePolicySatisfier implements Runnable {
    */
   private boolean findSourceAndTargetToMove(
       List<BlockMovingInfo> blockMovingInfos, BlockInfo blockInfo,
-      List<StorageType> existing,
       List<StorageTypeNodePair> sourceWithStorageList,
       List<StorageType> expected,
       StorageTypeNodeMap locsForExpectedStorageTypes) {
@@ -403,6 +416,7 @@ public class StoragePolicySatisfier implements Runnable {
         targetNodes.add(chosenTarget.dn);
         targetStorageTypes.add(chosenTarget.storageType);
         chosenNodes.add(chosenTarget.dn);
+        expected.remove(chosenTarget.storageType);
         // TODO: We can increment scheduled block count for this node?
       }
     }
@@ -442,16 +456,20 @@ public class StoragePolicySatisfier implements Runnable {
         targetNodes.add(chosenTarget.dn);
         targetStorageTypes.add(chosenTarget.storageType);
         chosenNodes.add(chosenTarget.dn);
+        expected.remove(chosenTarget.storageType);
         // TODO: We can increment scheduled block count for this node?
       } else {
         LOG.warn(
             "Failed to choose target datanode for the required"
                 + " storage types {}, block:{}, existing storage type:{}",
             expected, blockInfo, existingTypeNodePair.storageType);
-        foundMatchingTargetNodesForBlock = false;
       }
     }
 
+    if (expected.size() > 0) {
+      foundMatchingTargetNodesForBlock = false;
+    }
+
     blockMovingInfos.addAll(getBlockMovingInfos(blockInfo, sourceNodes,
         sourceStorageTypes, targetNodes, targetStorageTypes));
     return foundMatchingTargetNodesForBlock;
@@ -616,6 +634,23 @@ public class StoragePolicySatisfier implements Runnable {
     return max;
   }
 
+  private boolean checkSourceAndTargetTypeExists(DatanodeDescriptor dn,
+      List<StorageType> existing, List<StorageType> expectedStorageTypes) {
+    DatanodeStorageInfo[] allDNStorageInfos = dn.getStorageInfos();
+    boolean isExpectedTypeAvailable = false;
+    boolean isExistingTypeAvailable = false;
+    for (DatanodeStorageInfo dnInfo : allDNStorageInfos) {
+      StorageType storageType = dnInfo.getStorageType();
+      if (existing.contains(storageType)) {
+        isExistingTypeAvailable = true;
+      }
+      if (expectedStorageTypes.contains(storageType)) {
+        isExpectedTypeAvailable = true;
+      }
+    }
+    return isExistingTypeAvailable && isExpectedTypeAvailable;
+  }
+
   private static class StorageTypeNodeMap {
     private final EnumMap<StorageType, List<DatanodeDescriptor>> typeNodeMap =
         new EnumMap<StorageType, List<DatanodeDescriptor>>(StorageType.class);

http://git-wip-us.apache.org/repos/asf/hadoop/blob/29ff3425/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestStoragePolicySatisfier.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestStoragePolicySatisfier.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestStoragePolicySatisfier.java
index 718dbcb..9abb78d 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestStoragePolicySatisfier.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestStoragePolicySatisfier.java
@@ -580,6 +580,77 @@ public class TestStoragePolicySatisfier {
     }
   }
 
+  /**
+   * Tests that moving block storage with in the same datanode. Let's say we
+   * have DN1[DISK,ARCHIVE], DN2[DISK, SSD], DN3[DISK,RAM_DISK] when
+   * storagepolicy set to ONE_SSD and request satisfyStoragePolicy, then block
+   * should move to DN2[SSD] successfully.
+   */
+  @Test(timeout = 300000)
+  public void testBlockMoveInSameDatanodeWithONESSD() throws Exception {
+    StorageType[][] diskTypes =
+        new StorageType[][]{{StorageType.DISK, StorageType.ARCHIVE},
+            {StorageType.DISK, StorageType.SSD},
+            {StorageType.DISK, StorageType.RAM_DISK}};
+    config.setLong("dfs.block.size", DEFAULT_BLOCK_SIZE);
+    try {
+      hdfsCluster = startCluster(config, diskTypes, numOfDatanodes,
+          storagesPerDatanode, capacity);
+      dfs = hdfsCluster.getFileSystem();
+      writeContent(file);
+
+      // Change policy to ONE_SSD
+      dfs.setStoragePolicy(new Path(file), "ONE_SSD");
+      FSNamesystem namesystem = hdfsCluster.getNamesystem();
+      INode inode = namesystem.getFSDirectory().getINode(file);
+
+      namesystem.getBlockManager().satisfyStoragePolicy(inode.getId());
+      hdfsCluster.triggerHeartbeats();
+      waitExpectedStorageType(file, StorageType.SSD, 1, 30000);
+      waitExpectedStorageType(file, StorageType.DISK, 2, 30000);
+
+    } finally {
+      shutdownCluster();
+    }
+  }
+
+  /**
+   * Tests that moving block storage with in the same datanode and remote node.
+   * Let's say we have DN1[DISK,ARCHIVE], DN2[ARCHIVE, SSD], DN3[DISK,DISK],
+   * DN4[DISK,DISK] when storagepolicy set to WARM and request
+   * satisfyStoragePolicy, then block should move to DN1[ARCHIVE] and
+   * DN2[ARCHIVE] successfully.
+   */
+  @Test(timeout = 300000)
+  public void testBlockMoveInSameAndRemoteDatanodesWithWARM() throws Exception 
{
+    StorageType[][] diskTypes =
+        new StorageType[][]{{StorageType.DISK, StorageType.ARCHIVE},
+            {StorageType.ARCHIVE, StorageType.SSD},
+            {StorageType.DISK, StorageType.DISK},
+            {StorageType.DISK, StorageType.DISK}};
+
+    config.setLong("dfs.block.size", DEFAULT_BLOCK_SIZE);
+    try {
+      hdfsCluster = startCluster(config, diskTypes, diskTypes.length,
+          storagesPerDatanode, capacity);
+      dfs = hdfsCluster.getFileSystem();
+      writeContent(file);
+
+      // Change policy to WARM
+      dfs.setStoragePolicy(new Path(file), "WARM");
+      FSNamesystem namesystem = hdfsCluster.getNamesystem();
+      INode inode = namesystem.getFSDirectory().getINode(file);
+
+      namesystem.getBlockManager().satisfyStoragePolicy(inode.getId());
+      hdfsCluster.triggerHeartbeats();
+
+      waitExpectedStorageType(file, StorageType.DISK, 1, 30000);
+      waitExpectedStorageType(file, StorageType.ARCHIVE, 2, 30000);
+    } finally {
+      shutdownCluster();
+    }
+  }
+
   private String createFileAndSimulateFavoredNodes(int favoredNodesCount)
       throws IOException {
     ArrayList<DataNode> dns = hdfsCluster.getDataNodes();


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org
For additional commands, e-mail: common-commits-h...@hadoop.apache.org

Reply via email to