HDFS-11151. [SPS]: StoragePolicySatisfier should gracefully handle when there 
is no target node with the required storage type. Contributed by Rakesh R


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/871c537b
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/871c537b
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/871c537b

Branch: refs/heads/HDFS-10285
Commit: 871c537bd3b36014693a4ce0c74deda1a0e4cec5
Parents: 7abdf98
Author: Rakesh Radhakrishnan <rake...@apache.org>
Authored: Sun Nov 27 11:15:26 2016 +0530
Committer: Rakesh Radhakrishnan <rake...@apache.org>
Committed: Wed Nov 8 14:04:27 2017 +0530

----------------------------------------------------------------------
 .../server/blockmanagement/BlockManager.java    |   2 +-
 .../datanode/BlockStorageMovementTracker.java   |  30 ++++--
 .../datanode/StoragePolicySatisfyWorker.java    |  20 +++-
 .../BlockStorageMovementAttemptedItems.java     |   4 +
 .../server/namenode/StoragePolicySatisfier.java |  53 ++++++---
 .../namenode/TestStoragePolicySatisfier.java    | 108 ++++++++++++++++++-
 6 files changed, 186 insertions(+), 31 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/871c537b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java
index 88b6874..953860b 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java
@@ -4884,7 +4884,7 @@ public class BlockManager implements BlockStatsMXBean {
    */
   public void satisfyStoragePolicy(long id) {
     storageMovementNeeded.add(id);
-    if(LOG.isDebugEnabled()) {
+    if (LOG.isDebugEnabled()) {
       LOG.debug("Added block collection id {} to block "
           + "storageMovementNeeded queue", id);
     }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/871c537b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockStorageMovementTracker.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockStorageMovementTracker.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockStorageMovementTracker.java
index d31f075..2de88fc 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockStorageMovementTracker.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockStorageMovementTracker.java
@@ -27,8 +27,9 @@ import java.util.concurrent.Future;
 
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.classification.InterfaceStability;
-import 
org.apache.hadoop.hdfs.server.datanode.StoragePolicySatisfyWorker.BlocksMovementsCompletionHandler;
 import 
org.apache.hadoop.hdfs.server.datanode.StoragePolicySatisfyWorker.BlockMovementResult;
+import 
org.apache.hadoop.hdfs.server.datanode.StoragePolicySatisfyWorker.BlockMovementStatus;
+import 
org.apache.hadoop.hdfs.server.datanode.StoragePolicySatisfyWorker.BlocksMovementsCompletionHandler;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -108,15 +109,32 @@ public class BlockStorageMovementTracker implements 
Runnable {
     }
   }
 
+  /**
+   * Mark as block movement failure for the given trackId and blockId.
+   *
+   * @param trackId tracking id
+   * @param blockId block id
+   */
+  void markBlockMovementFailure(long trackId, long blockId) {
+    LOG.debug("Mark as block movement failure for the given "
+        + "trackId:{} and blockId:{}", trackId, blockId);
+    BlockMovementResult result = new BlockMovementResult(trackId, blockId, 
null,
+        BlockMovementStatus.DN_BLK_STORAGE_MOVEMENT_FAILURE);
+    addMovementResultToTrackIdList(result);
+  }
+
   private List<BlockMovementResult> addMovementResultToTrackIdList(
       BlockMovementResult result) {
     long trackId = result.getTrackId();
-    List<BlockMovementResult> perTrackIdList = movementResults.get(trackId);
-    if (perTrackIdList == null) {
-      perTrackIdList = new ArrayList<>();
-      movementResults.put(trackId, perTrackIdList);
+    List<BlockMovementResult> perTrackIdList;
+    synchronized (movementResults) {
+      perTrackIdList = movementResults.get(trackId);
+      if (perTrackIdList == null) {
+        perTrackIdList = new ArrayList<>();
+        movementResults.put(trackId, perTrackIdList);
+      }
+      perTrackIdList.add(result);
     }
-    perTrackIdList.add(result);
     return perTrackIdList;
   }
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/871c537b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/StoragePolicySatisfyWorker.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/StoragePolicySatisfyWorker.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/StoragePolicySatisfyWorker.java
index 1bd851e..a69a38b 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/StoragePolicySatisfyWorker.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/StoragePolicySatisfyWorker.java
@@ -151,14 +151,24 @@ public class StoragePolicySatisfyWorker {
    */
   public void processBlockMovingTasks(long trackID, String blockPoolID,
       Collection<BlockMovingInfo> blockMovingInfos) {
+    LOG.debug("Received BlockMovingTasks {}", blockMovingInfos);
     for (BlockMovingInfo blkMovingInfo : blockMovingInfos) {
-      assert blkMovingInfo
-          .getSources().length == blkMovingInfo.getTargets().length;
-
-      for (int i = 0; i < blkMovingInfo.getSources().length; i++) {
+      // Iterating backwards. This is to ensure that all the block src location
+      // which doesn't have a target node will be marked as failure before
+      // scheduling the block movement to valid target nodes.
+      for (int i = blkMovingInfo.getSources().length - 1; i >= 0; i--) {
+        if (i >= blkMovingInfo.getTargets().length) {
+          // Since there is no target selected for scheduling the block,
+          // just mark this block storage movement as failure. Later, namenode
+          // can take action on this.
+          movementTracker.markBlockMovementFailure(trackID,
+              blkMovingInfo.getBlock().getBlockId());
+          continue;
+        }
+        DatanodeInfo target = blkMovingInfo.getTargets()[i];
         BlockMovingTask blockMovingTask = new BlockMovingTask(
             trackID, blockPoolID, blkMovingInfo.getBlock(),
-            blkMovingInfo.getSources()[i], blkMovingInfo.getTargets()[i],
+            blkMovingInfo.getSources()[i], target,
             blkMovingInfo.getSourceStorageTypes()[i],
             blkMovingInfo.getTargetStorageTypes()[i]);
         Future<BlockMovementResult> moveCallable = moverCompletionService

http://git-wip-us.apache.org/repos/asf/hadoop/blob/871c537b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/BlockStorageMovementAttemptedItems.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/BlockStorageMovementAttemptedItems.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/BlockStorageMovementAttemptedItems.java
index 580d0d6..5457dc2 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/BlockStorageMovementAttemptedItems.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/BlockStorageMovementAttemptedItems.java
@@ -218,4 +218,8 @@ public class BlockStorageMovementAttemptedItems {
     return storageMovementAttemptedResults.size();
   }
 
+  @VisibleForTesting
+  public int getAttemptedItemsCount() {
+    return storageMovementAttemptedItems.size();
+  }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/871c537b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/StoragePolicySatisfier.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/StoragePolicySatisfier.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/StoragePolicySatisfier.java
index 4967a89..617ab2c 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/StoragePolicySatisfier.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/StoragePolicySatisfier.java
@@ -211,6 +211,14 @@ public class StoragePolicySatisfier implements Runnable {
       }
     }
 
+    addBlockMovingInfosToCoordinatorDn(blockCollectionID, blockMovingInfos,
+        coordinatorNode);
+  }
+
+  private void addBlockMovingInfosToCoordinatorDn(long blockCollectionID,
+      List<BlockMovingInfo> blockMovingInfos,
+      DatanodeDescriptor coordinatorNode) {
+
     if (blockMovingInfos.size() < 1) {
       // TODO: Major: handle this case. I think we need retry cases to
       // be implemented. Idea is, if some files are not getting storage 
movement
@@ -218,6 +226,20 @@ public class StoragePolicySatisfier implements Runnable {
       return;
     }
 
+    boolean needBlockStorageMovement = false;
+    for (BlockMovingInfo blkMovingInfo : blockMovingInfos) {
+      // Check for atleast one block storage movement has been chosen
+      if (blkMovingInfo.getTargets().length > 0){
+        needBlockStorageMovement = true;
+        break;
+      }
+    }
+    if (!needBlockStorageMovement) {
+      // Simply return as there is no targets selected for scheduling the block
+      // movement.
+      return;
+    }
+
     // 'BlockCollectionId' is used as the tracking ID. All the blocks under 
this
     // blockCollectionID will be added to this datanode.
     coordinatorNode.addBlocksToMoveStorage(blockCollectionID, 
blockMovingInfos);
@@ -251,9 +273,8 @@ public class StoragePolicySatisfier implements Runnable {
     List<DatanodeDescriptor> chosenNodes = new ArrayList<>();
     for (int i = 0; i < sourceWithStorageList.size(); i++) {
       StorageTypeNodePair existingTypeNodePair = sourceWithStorageList.get(i);
-      StorageTypeNodePair chosenTarget =
-          chooseTargetTypeInSameNode(existingTypeNodePair.dn, expected,
-              locsForExpectedStorageTypes, chosenNodes);
+      StorageTypeNodePair chosenTarget = chooseTargetTypeInSameNode(
+          existingTypeNodePair.dn, expected);
 
       if (chosenTarget == null && blockManager.getDatanodeManager()
           .getNetworkTopology().isNodeGroupAware()) {
@@ -282,15 +303,14 @@ public class StoragePolicySatisfier implements Runnable {
         chosenNodes.add(chosenTarget.dn);
         // TODO: We can increment scheduled block count for this node?
       } else {
-        // TODO: Failed to ChooseTargetNodes...So let just retry. Shall we
-        // proceed without this targets? Then what should be final result?
-        // How about pack empty target, means target node could not be chosen ,
-        // so result should be RETRY_REQUIRED from DN always.
-        // Log..unable to choose target node for source datanodeDescriptor
+        LOG.warn(
+            "Failed to choose target datanode for the required"
+                + " storage types {}, block:{}, existing storage type:{}",
+            expected, blockInfo, existingTypeNodePair.storageType);
         sourceNodes.add(existingTypeNodePair.dn);
         sourceStorageTypes.add(existingTypeNodePair.storageType);
-        targetNodes.add(null);
-        targetStorageTypes.add(null);
+        // Imp: Not setting the target details, empty targets. Later, this is
+        // used as an indicator for retrying this block movement.
       }
     }
     BlockMovingInfo blkMovingInfo = new BlockMovingInfo(blockInfo,
@@ -302,15 +322,13 @@ public class StoragePolicySatisfier implements Runnable {
   }
 
   /**
-   * Choose the target storage within same Datanode if possible.
+   * Choose the target storage within same datanode if possible.
    *
-   * @param locsForExpectedStorageTypes
-   * @param chosenNodes
+   * @param source source datanode
+   * @param targetTypes list of target storage types
    */
   private StorageTypeNodePair chooseTargetTypeInSameNode(
-      DatanodeDescriptor source, List<StorageType> targetTypes,
-      StorageTypeNodeMap locsForExpectedStorageTypes,
-      List<DatanodeDescriptor> chosenNodes) {
+      DatanodeDescriptor source, List<StorageType> targetTypes) {
     for (StorageType t : targetTypes) {
       DatanodeStorageInfo chooseStorage4Block =
           source.chooseStorage4Block(t, 0);
@@ -328,6 +346,9 @@ public class StoragePolicySatisfier implements Runnable {
     for (StorageType t : targetTypes) {
       List<DatanodeDescriptor> nodesWithStorages =
           locsForExpectedStorageTypes.getNodesWithStorages(t);
+      if (nodesWithStorages == null || nodesWithStorages.isEmpty()) {
+        continue; // no target nodes with the required storage type.
+      }
       Collections.shuffle(nodesWithStorages);
       for (DatanodeDescriptor target : nodesWithStorages) {
         if (!chosenNodes.contains(target) && matcher.match(

http://git-wip-us.apache.org/repos/asf/hadoop/blob/871c537b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestStoragePolicySatisfier.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestStoragePolicySatisfier.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestStoragePolicySatisfier.java
index 901e1ba..499fe3c 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestStoragePolicySatisfier.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestStoragePolicySatisfier.java
@@ -17,6 +17,8 @@
  */
 package org.apache.hadoop.hdfs.server.namenode;
 
+import static 
org.apache.hadoop.hdfs.DFSConfigKeys.DFS_STORAGE_POLICY_ENABLED_KEY;
+
 import java.io.FileNotFoundException;
 import java.io.IOException;
 import java.util.ArrayList;
@@ -44,8 +46,6 @@ import org.slf4j.LoggerFactory;
 
 import com.google.common.base.Supplier;
 
-import static 
org.apache.hadoop.hdfs.DFSConfigKeys.DFS_STORAGE_POLICY_ENABLED_KEY;
-
 /**
  * Tests that StoragePolicySatisfier daemon is able to check the blocks to be
  * moved and finding its suggested target locations to move.
@@ -79,7 +79,7 @@ public class TestStoragePolicySatisfier {
       throws Exception {
 
     try {
-      // Change policy to ALL_SSD
+      // Change policy to COLD
       dfs.setStoragePolicy(new Path(file), "COLD");
       FSNamesystem namesystem = hdfsCluster.getNamesystem();
       INode inode = namesystem.getFSDirectory().getINode(file);
@@ -356,6 +356,108 @@ public class TestStoragePolicySatisfier {
     }
   }
 
+  /**
+   * Tests to verify that for the given path, some of the blocks or block src
+   * locations(src nodes) under the given path will be scheduled for block
+   * movement.
+   *
+   * For example, there are two block for a file:
+   *
+   * File1 => blk_1[locations=A(DISK),B(DISK),C(DISK)],
+   * blk_2[locations=A(DISK),B(DISK),C(DISK)]. Now, set storage policy to COLD.
+   * Only one datanode is available with storage type ARCHIVE, say D.
+   *
+   * SPS will schedule block movement to the coordinator node with the details,
+   * blk_1[move A(DISK) -> D(ARCHIVE)], blk_2[move A(DISK) -> D(ARCHIVE)].
+   */
+  @Test(timeout = 300000)
+  public void testWhenOnlyFewTargetDatanodeAreAvailableToSatisfyStoragePolicy()
+      throws Exception {
+    try {
+      // Change policy to COLD
+      dfs.setStoragePolicy(new Path(file), "COLD");
+      FSNamesystem namesystem = hdfsCluster.getNamesystem();
+      INode inode = namesystem.getFSDirectory().getINode(file);
+
+      StorageType[][] newtypes =
+          new StorageType[][]{{StorageType.ARCHIVE, StorageType.ARCHIVE}};
+
+      // Adding ARCHIVE based datanodes.
+      startAdditionalDNs(config, 1, numOfDatanodes, newtypes,
+          storagesPerDatanode, capacity, hdfsCluster);
+
+      namesystem.getBlockManager().satisfyStoragePolicy(inode.getId());
+      hdfsCluster.triggerHeartbeats();
+      // Wait till StorgePolicySatisfier identified that block to move to
+      // ARCHIVE area.
+      waitExpectedStorageType(file, StorageType.ARCHIVE, 1, 30000);
+      waitExpectedStorageType(file, StorageType.DISK, 2, 30000);
+
+      waitForBlocksMovementResult(1, 30000);
+    } finally {
+      hdfsCluster.shutdown();
+    }
+  }
+
+  /**
+   * Tests to verify that for the given path, no blocks or block src
+   * locations(src nodes) under the given path will be scheduled for block
+   * movement as there are no available datanode with required storage type.
+   *
+   * For example, there are two block for a file:
+   *
+   * File1 => blk_1[locations=A(DISK),B(DISK),C(DISK)],
+   * blk_2[locations=A(DISK),B(DISK),C(DISK)]. Now, set storage policy to COLD.
+   * No datanode is available with storage type ARCHIVE.
+   *
+   * SPS won't schedule any block movement for this path.
+   */
+  @Test(timeout = 300000)
+  public void testWhenNoTargetDatanodeToSatisfyStoragePolicy()
+      throws Exception {
+    try {
+      // Change policy to COLD
+      dfs.setStoragePolicy(new Path(file), "COLD");
+      FSNamesystem namesystem = hdfsCluster.getNamesystem();
+      INode inode = namesystem.getFSDirectory().getINode(file);
+
+      StorageType[][] newtypes =
+          new StorageType[][]{{StorageType.DISK, StorageType.DISK}};
+      // Adding DISK based datanodes
+      startAdditionalDNs(config, 1, numOfDatanodes, newtypes,
+          storagesPerDatanode, capacity, hdfsCluster);
+
+      namesystem.getBlockManager().satisfyStoragePolicy(inode.getId());
+      hdfsCluster.triggerHeartbeats();
+
+      // No block movement will be scheduled as there is no target node 
available
+      // with the required storage type.
+      waitForAttemptedItems(1, 30000);
+      waitExpectedStorageType(file, StorageType.DISK, 3, 30000);
+      // Since there is no target node the item will get timed out and then
+      // re-attempted.
+      waitForAttemptedItems(1, 30000);
+    } finally {
+      hdfsCluster.shutdown();
+    }
+  }
+
+  private void waitForAttemptedItems(long expectedBlkMovAttemptedCount,
+      int timeout) throws TimeoutException, InterruptedException {
+    BlockManager blockManager = hdfsCluster.getNamesystem().getBlockManager();
+    final StoragePolicySatisfier sps = 
blockManager.getStoragePolicySatisfier();
+    GenericTestUtils.waitFor(new Supplier<Boolean>() {
+      @Override
+      public Boolean get() {
+        LOG.info("expectedAttemptedItemsCount={} actualAttemptedItemsCount={}",
+            expectedBlkMovAttemptedCount,
+            sps.getAttemptedItemsMonitor().getAttemptedItemsCount());
+        return sps.getAttemptedItemsMonitor()
+            .getAttemptedItemsCount() == expectedBlkMovAttemptedCount;
+      }
+    }, 100, timeout);
+  }
+
   private void waitForBlocksMovementResult(long expectedBlkMovResultsCount,
       int timeout) throws TimeoutException, InterruptedException {
     BlockManager blockManager = hdfsCluster.getNamesystem().getBlockManager();


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org
For additional commands, e-mail: common-commits-h...@hadoop.apache.org

Reply via email to