Repository: hadoop
Updated Branches:
  refs/heads/HDFS-10285 ffb1b2996 -> 04ed6dd1f


HDFS-12106: [SPS]: Improve storage policy satisfier configurations. Contributed 
by Surendra Singh Lilhore.


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/04ed6dd1
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/04ed6dd1
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/04ed6dd1

Branch: refs/heads/HDFS-10285
Commit: 04ed6dd1f737cd704d54ee3e053c2a1cca31572d
Parents: ffb1b29
Author: Surendra Singh Lilhore <surendralilh...@apache.org>
Authored: Wed Nov 15 20:22:27 2017 +0530
Committer: Surendra Singh Lilhore <surendralilh...@apache.org>
Committed: Wed Nov 15 20:22:27 2017 +0530

----------------------------------------------------------------------
 .../hadoop/hdfs/protocol/HdfsConstants.java     |  6 +++
 .../hadoop/hdfs/protocolPB/PBHelperClient.java  |  4 ++
 .../src/main/proto/ClientNamenodeProtocol.proto |  3 +-
 .../org/apache/hadoop/hdfs/DFSConfigKeys.java   | 10 ++--
 .../server/blockmanagement/DatanodeManager.java | 12 ++---
 .../datanode/StoragePolicySatisfyWorker.java    |  3 +-
 .../BlockStorageMovementAttemptedItems.java     |  8 +--
 .../namenode/BlockStorageMovementNeeded.java    | 46 ++++++++++++----
 .../hdfs/server/namenode/FSNamesystem.java      |  3 ++
 .../server/namenode/StoragePolicySatisfier.java | 42 ++++++++++++---
 .../hadoop/hdfs/tools/StoragePolicyAdmin.java   | 27 +++++++---
 .../src/main/resources/hdfs-default.xml         | 17 ++++--
 .../src/site/markdown/ArchivalStorage.md        |  2 +-
 .../TestBlockStorageMovementAttemptedItems.java | 10 ++--
 .../namenode/TestStoragePolicySatisfier.java    | 57 ++++++++++++++++++--
 15 files changed, 199 insertions(+), 51 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/04ed6dd1/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/HdfsConstants.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/HdfsConstants.java
 
b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/HdfsConstants.java
index aa0496e..c09c61c 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/HdfsConstants.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/HdfsConstants.java
@@ -147,6 +147,12 @@ public final class HdfsConstants {
     SUCCESS,
 
     /**
+     * Few blocks failed to move and the path is still not
+     * fully satisfied the storage policy.
+     */
+    FAILURE,
+
+    /**
      * Status not available.
      */
     NOT_AVAILABLE

http://git-wip-us.apache.org/repos/asf/hadoop/blob/04ed6dd1/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelperClient.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelperClient.java
 
b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelperClient.java
index 6507009..bea5915 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelperClient.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelperClient.java
@@ -3118,6 +3118,8 @@ public class PBHelperClient {
       return StoragePolicySatisfyPathStatus.IN_PROGRESS;
     case SUCCESS:
       return StoragePolicySatisfyPathStatus.SUCCESS;
+    case FAILURE:
+      return StoragePolicySatisfyPathStatus.FAILURE;
     case NOT_AVAILABLE:
       return StoragePolicySatisfyPathStatus.NOT_AVAILABLE;
     default:
@@ -3134,6 +3136,8 @@ public class PBHelperClient {
       return HdfsConstants.StoragePolicySatisfyPathStatus.IN_PROGRESS;
     case SUCCESS:
       return HdfsConstants.StoragePolicySatisfyPathStatus.SUCCESS;
+    case FAILURE:
+      return HdfsConstants.StoragePolicySatisfyPathStatus.FAILURE;
     case NOT_AVAILABLE:
       return HdfsConstants.StoragePolicySatisfyPathStatus.NOT_AVAILABLE;
     default:

http://git-wip-us.apache.org/repos/asf/hadoop/blob/04ed6dd1/hadoop-hdfs-project/hadoop-hdfs-client/src/main/proto/ClientNamenodeProtocol.proto
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/proto/ClientNamenodeProtocol.proto
 
b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/proto/ClientNamenodeProtocol.proto
index d31856f..c60c035 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/proto/ClientNamenodeProtocol.proto
+++ 
b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/proto/ClientNamenodeProtocol.proto
@@ -816,7 +816,8 @@ message CheckStoragePolicySatisfyPathStatusResponseProto {
     PENDING = 0;
     IN_PROGRESS = 1;
     SUCCESS = 2;
-    NOT_AVAILABLE = 3;
+    FAILURE = 3;
+    NOT_AVAILABLE = 4;
   }
   required StoragePolicySatisfyPathStatus status = 1;
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/04ed6dd1/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
index 879c300..ed14f77 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
@@ -585,10 +585,14 @@ public class DFSConfigKeys extends 
CommonConfigurationKeys {
       "dfs.storage.policy.satisfier.self.retry.timeout.millis";
   public static final int 
DFS_STORAGE_POLICY_SATISFIER_SELF_RETRY_TIMEOUT_MILLIS_DEFAULT =
       5 * 60 * 1000;
-  public static final String 
DFS_STORAGE_POLICY_SATISFIER_SHARE_EQUAL_REPLICA_MAX_STREAMS_KEY =
+  public static final String 
DFS_STORAGE_POLICY_SATISFIER_MAX_RETRY_ATTEMPTS_KEY =
+      "dfs.storage.policy.satisfier.retry.max.attempts";
+  public static final int 
DFS_STORAGE_POLICY_SATISFIER_MAX_RETRY_ATTEMPTS_DEFAULT =
+      3;
+  public static final String 
DFS_STORAGE_POLICY_SATISFIER_LOW_MAX_STREAMS_PREFERENCE_KEY =
       "dfs.storage.policy.satisfier.low.max-streams.preference";
-  public static final boolean 
DFS_STORAGE_POLICY_SATISFIER_SHARE_EQUAL_REPLICA_MAX_STREAMS_DEFAULT =
-      false;
+  public static final boolean 
DFS_STORAGE_POLICY_SATISFIER_LOW_MAX_STREAMS_PREFERENCE_DEFAULT =
+      true;
 
   public static final String  DFS_DATANODE_ADDRESS_KEY = 
"dfs.datanode.address";
   public static final int     DFS_DATANODE_DEFAULT_PORT = 9866;

http://git-wip-us.apache.org/repos/asf/hadoop/blob/04ed6dd1/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java
index 4ea41d9..5cc4804 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java
@@ -208,7 +208,7 @@ public class DatanodeManager {
    */
   private final long timeBetweenResendingCachingDirectivesMs;
 
-  private final boolean blocksToMoveShareEqualRatio;
+  private final boolean blocksToMoveLowPriority;
 
   DatanodeManager(final BlockManager blockManager, final Namesystem namesystem,
       final Configuration conf) throws IOException {
@@ -337,9 +337,9 @@ public class DatanodeManager {
 
     // SPS configuration to decide blocks to move can share equal ratio of
     // maxtransfers with pending replica and erasure-coded reconstruction tasks
-    blocksToMoveShareEqualRatio = conf.getBoolean(
-        
DFSConfigKeys.DFS_STORAGE_POLICY_SATISFIER_SHARE_EQUAL_REPLICA_MAX_STREAMS_KEY,
-        
DFSConfigKeys.DFS_STORAGE_POLICY_SATISFIER_SHARE_EQUAL_REPLICA_MAX_STREAMS_DEFAULT);
+    blocksToMoveLowPriority = conf.getBoolean(
+        
DFSConfigKeys.DFS_STORAGE_POLICY_SATISFIER_LOW_MAX_STREAMS_PREFERENCE_KEY,
+        
DFSConfigKeys.DFS_STORAGE_POLICY_SATISFIER_LOW_MAX_STREAMS_PREFERENCE_DEFAULT);
   }
 
   private static long getStaleIntervalFromConf(Configuration conf,
@@ -1697,11 +1697,11 @@ public class DatanodeManager {
       int numReplicationTasks = 0;
       int numECTasks = 0;
       int numBlocksToMoveTasks = 0;
-      // Check blocksToMoveShareEqualRatio configuration is true/false. If 
true,
+      // Check blocksToMoveLowPriority configuration is true/false. If false,
       // then equally sharing the max transfer. Otherwise gives high priority 
to
       // the pending_replica/erasure-coded tasks and only the delta streams 
will
       // be used for blocks to move tasks.
-      if (blocksToMoveShareEqualRatio) {
+      if (!blocksToMoveLowPriority) {
         // add blocksToMove count to total blocks so that will get equal share
         totalBlocks = totalBlocks + totalBlocksToMove;
         numReplicationTasks = (int) Math

http://git-wip-us.apache.org/repos/asf/hadoop/blob/04ed6dd1/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/StoragePolicySatisfyWorker.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/StoragePolicySatisfyWorker.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/StoragePolicySatisfyWorker.java
index 47318f8..9a9c7e0 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/StoragePolicySatisfyWorker.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/StoragePolicySatisfyWorker.java
@@ -196,7 +196,8 @@ public class StoragePolicySatisfyWorker {
    * This class encapsulates the process of moving the block replica to the
    * given target and wait for the response.
    */
-  private class BlockMovingTask implements 
Callable<BlockMovementAttemptFinished> {
+  private class BlockMovingTask implements
+      Callable<BlockMovementAttemptFinished> {
     private final String blockPoolID;
     private final Block block;
     private final DatanodeInfo source;

http://git-wip-us.apache.org/repos/asf/hadoop/blob/04ed6dd1/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/BlockStorageMovementAttemptedItems.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/BlockStorageMovementAttemptedItems.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/BlockStorageMovementAttemptedItems.java
index cc5b63a..643255f 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/BlockStorageMovementAttemptedItems.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/BlockStorageMovementAttemptedItems.java
@@ -183,7 +183,7 @@ public class BlockStorageMovementAttemptedItems {
           Long blockCollectionID = itemInfo.getTrackId();
           synchronized (movementFinishedBlocks) {
             ItemInfo candidate = new ItemInfo(itemInfo.getStartId(),
-                blockCollectionID);
+                blockCollectionID, itemInfo.getRetryCount() + 1);
             blockStorageMovementNeeded.add(candidate);
             iter.remove();
             LOG.info("TrackID: {} becomes timed out and moved to needed "
@@ -211,9 +211,9 @@ public class BlockStorageMovementAttemptedItems {
               // TODO: try add this at front of the Queue, so that this element
               // gets the chance first and can be cleaned from queue quickly as
               // all movements already done.
-              blockStorageMovementNeeded
-                  .add(new ItemInfo(attemptedItemInfo.getStartId(),
-                      attemptedItemInfo.getTrackId()));
+              blockStorageMovementNeeded.add(new ItemInfo(attemptedItemInfo
+                  .getStartId(), attemptedItemInfo.getTrackId(),
+                  attemptedItemInfo.getRetryCount() + 1));
               iterator.remove();
             }
           }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/04ed6dd1/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/BlockStorageMovementNeeded.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/BlockStorageMovementNeeded.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/BlockStorageMovementNeeded.java
index 8f7487c..89bcbff 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/BlockStorageMovementNeeded.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/BlockStorageMovementNeeded.java
@@ -178,8 +178,8 @@ public class BlockStorageMovementNeeded {
    * Decrease the pending child count for directory once one file blocks moved
    * successfully. Remove the SPS xAttr if pending child count is zero.
    */
-  public synchronized void removeItemTrackInfo(ItemInfo trackInfo)
-      throws IOException {
+  public synchronized void removeItemTrackInfo(ItemInfo trackInfo,
+      boolean isSuccess) throws IOException {
     if (trackInfo.isDir()) {
       // If track is part of some start inode then reduce the pending
       // directory work count.
@@ -188,7 +188,7 @@ public class BlockStorageMovementNeeded {
       if (inode == null) {
         // directory deleted just remove it.
         this.pendingWorkForDirectory.remove(startId);
-        markSuccess(startId);
+        updateStatus(startId, isSuccess);
       } else {
         DirPendingWorkInfo pendingWork = pendingWorkForDirectory.get(startId);
         if (pendingWork != null) {
@@ -196,8 +196,10 @@ public class BlockStorageMovementNeeded {
           if (pendingWork.isDirWorkDone()) {
             namesystem.removeXattr(startId, XATTR_SATISFY_STORAGE_POLICY);
             pendingWorkForDirectory.remove(startId);
-            markSuccess(startId);
+            pendingWork.setFailure(!isSuccess);
+            updateStatus(startId, pendingWork.isPolicySatisfied());
           }
+          pendingWork.setFailure(isSuccess);
         }
       }
     } else {
@@ -205,7 +207,7 @@ public class BlockStorageMovementNeeded {
       // storageMovementAttemptedItems or file policy satisfied.
       namesystem.removeXattr(trackInfo.getTrackId(),
           XATTR_SATISFY_STORAGE_POLICY);
-      markSuccess(trackInfo.getStartId());
+      updateStatus(trackInfo.getStartId(), isSuccess);
     }
   }
 
@@ -224,14 +226,19 @@ public class BlockStorageMovementNeeded {
   /**
    * Mark inode status as SUCCESS in map.
    */
-  private void markSuccess(long startId){
+  private void updateStatus(long startId, boolean isSuccess){
     StoragePolicySatisfyPathStatusInfo spsStatusInfo =
         spsStatus.get(startId);
     if (spsStatusInfo == null) {
       spsStatusInfo = new StoragePolicySatisfyPathStatusInfo();
       spsStatus.put(startId, spsStatusInfo);
     }
-    spsStatusInfo.setSuccess();
+
+    if (isSuccess) {
+      spsStatusInfo.setSuccess();
+    } else {
+      spsStatusInfo.setFailure();
+    }
   }
 
   /**
@@ -325,7 +332,7 @@ public class BlockStorageMovementNeeded {
                   namesystem.removeXattr(startInode.getId(),
                       XATTR_SATISFY_STORAGE_POLICY);
                   pendingWorkForDirectory.remove(startInode.getId());
-                  markSuccess(startInode.getId());
+                  updateStatus(startInode.getId(), true);
                 }
               }
             }
@@ -431,6 +438,7 @@ public class BlockStorageMovementNeeded {
 
     private int pendingWorkCount = 0;
     private boolean fullyScanned = false;
+    private boolean success = true;
 
     /**
      * Increment the pending work count for directory.
@@ -461,6 +469,20 @@ public class BlockStorageMovementNeeded {
     public synchronized void markScanCompleted() {
       this.fullyScanned = true;
     }
+
+    /**
+     * Return true if all the files block movement is success, otherwise false.
+     */
+    public boolean isPolicySatisfied() {
+      return success;
+    }
+
+    /**
+     * Set directory SPS status failed.
+     */
+    public void setFailure(boolean failure) {
+      this.success = this.success || failure;
+    }
   }
 
   public void init() {
@@ -510,6 +532,11 @@ public class BlockStorageMovementNeeded {
       this.lastStatusUpdateTime = Time.monotonicNow();
     }
 
+    private void setFailure() {
+      this.status = StoragePolicySatisfyPathStatus.FAILURE;
+      this.lastStatusUpdateTime = Time.monotonicNow();
+    }
+
     private StoragePolicySatisfyPathStatus getStatus() {
       return status;
     }
@@ -518,7 +545,8 @@ public class BlockStorageMovementNeeded {
      * Return true if SUCCESS status cached more then 5 min.
      */
     private boolean canRemove() {
-      return StoragePolicySatisfyPathStatus.SUCCESS == status
+      return (StoragePolicySatisfyPathStatus.SUCCESS == status
+          || StoragePolicySatisfyPathStatus.FAILURE == status)
           && (Time.monotonicNow()
               - lastStatusUpdateTime) > statusClearanceElapsedTimeMs;
     }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/04ed6dd1/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
index fb9e4c3..9678ef1 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
@@ -7622,6 +7622,9 @@ public class FSNamesystem implements Namesystem, 
FSNamesystemMBean,
     writeLock();
     try {
       final INode inode = dir.getInode(id);
+      if (inode == null) {
+        return;
+      }
       final XAttrFeature xaf = inode.getXAttrFeature();
       if (xaf == null) {
         return;

http://git-wip-us.apache.org/repos/asf/hadoop/blob/04ed6dd1/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/StoragePolicySatisfier.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/StoragePolicySatisfier.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/StoragePolicySatisfier.java
index 2382d36..972e744 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/StoragePolicySatisfier.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/StoragePolicySatisfier.java
@@ -83,6 +83,7 @@ public class StoragePolicySatisfier implements Runnable {
   private volatile boolean isRunning = false;
   private int spsWorkMultiplier;
   private long blockCount = 0L;
+  private int blockMovementMaxRetry;
   /**
    * Represents the collective analysis status for all blocks.
    */
@@ -137,6 +138,9 @@ public class StoragePolicySatisfier implements Runnable {
             
DFSConfigKeys.DFS_STORAGE_POLICY_SATISFIER_SELF_RETRY_TIMEOUT_MILLIS_DEFAULT),
         storageMovementNeeded);
     this.spsWorkMultiplier = DFSUtil.getSPSWorkMultiplier(conf);
+    this.blockMovementMaxRetry = conf.getInt(
+        DFSConfigKeys.DFS_STORAGE_POLICY_SATISFIER_MAX_RETRY_ATTEMPTS_KEY,
+        DFSConfigKeys.DFS_STORAGE_POLICY_SATISFIER_MAX_RETRY_ATTEMPTS_DEFAULT);
   }
 
   /**
@@ -243,6 +247,13 @@ public class StoragePolicySatisfier implements Runnable {
         if (!namesystem.isInSafeMode()) {
           ItemInfo itemInfo = storageMovementNeeded.get();
           if (itemInfo != null) {
+            if(itemInfo.getRetryCount() >= blockMovementMaxRetry){
+              LOG.info("Failed to satisfy the policy after "
+                  + blockMovementMaxRetry + " retries. Removing inode "
+                  + itemInfo.getTrackId() + " from the queue");
+              storageMovementNeeded.removeItemTrackInfo(itemInfo, false);
+              continue;
+            }
             long trackId = itemInfo.getTrackId();
             BlockCollection blockCollection;
             BlocksMovingAnalysis status = null;
@@ -253,7 +264,7 @@ public class StoragePolicySatisfier implements Runnable {
               if (blockCollection == null) {
                 // File doesn't exists (maybe got deleted), remove trackId from
                 // the queue
-                storageMovementNeeded.removeItemTrackInfo(itemInfo);
+                storageMovementNeeded.removeItemTrackInfo(itemInfo, true);
               } else {
                 status =
                     analyseBlocksStorageMovementsAndAssignToDN(
@@ -269,9 +280,9 @@ public class StoragePolicySatisfier implements Runnable {
                 // Just add to monitor, so it will be tracked for report and
                 // be removed on storage movement attempt finished report.
               case BLOCKS_TARGETS_PAIRED:
-                this.storageMovementsMonitor.add(new AttemptedItemInfo(
-                    itemInfo.getStartId(), itemInfo.getTrackId(),
-                    monotonicNow(), status.assignedBlocks));
+                this.storageMovementsMonitor.add(new AttemptedItemInfo(itemInfo
+                    .getStartId(), itemInfo.getTrackId(), monotonicNow(),
+                    status.assignedBlocks, itemInfo.getRetryCount()));
                 break;
               case NO_BLOCKS_TARGETS_PAIRED:
                 if (LOG.isDebugEnabled()) {
@@ -279,6 +290,7 @@ public class StoragePolicySatisfier implements Runnable {
                       + " back to retry queue as none of the blocks"
                       + " found its eligible targets.");
                 }
+                itemInfo.retryCount++;
                 this.storageMovementNeeded.add(itemInfo);
                 break;
               case FEW_LOW_REDUNDANCY_BLOCKS:
@@ -295,7 +307,7 @@ public class StoragePolicySatisfier implements Runnable {
               default:
                 LOG.info("Block analysis skipped or blocks already satisfied"
                     + " with storages. So, Cleaning up the Xattrs.");
-                storageMovementNeeded.removeItemTrackInfo(itemInfo);
+                storageMovementNeeded.removeItemTrackInfo(itemInfo, true);
                 break;
               }
             }
@@ -861,10 +873,19 @@ public class StoragePolicySatisfier implements Runnable {
   public static class ItemInfo {
     private long startId;
     private long trackId;
+    private int retryCount;
 
     public ItemInfo(long startId, long trackId) {
       this.startId = startId;
       this.trackId = trackId;
+      //set 0 when item is getting added first time in queue.
+      this.retryCount = 0;
+    }
+
+    public ItemInfo(long startId, long trackId, int retryCount) {
+      this.startId = startId;
+      this.trackId = trackId;
+      this.retryCount = retryCount;
     }
 
     /**
@@ -887,6 +908,13 @@ public class StoragePolicySatisfier implements Runnable {
     public boolean isDir() {
       return (startId != trackId);
     }
+
+    /**
+     * Get the attempted retry count of the block for satisfy the policy.
+     */
+    public int getRetryCount() {
+      return retryCount;
+    }
   }
 
   /**
@@ -910,8 +938,8 @@ public class StoragePolicySatisfier implements Runnable {
      */
     AttemptedItemInfo(long rootId, long trackId,
         long lastAttemptedOrReportedTime,
-        List<Block> blocks) {
-      super(rootId, trackId);
+        List<Block> blocks, int retryCount) {
+      super(rootId, trackId, retryCount);
       this.lastAttemptedOrReportedTime = lastAttemptedOrReportedTime;
       this.blocks = blocks;
     }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/04ed6dd1/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/tools/StoragePolicyAdmin.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/tools/StoragePolicyAdmin.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/tools/StoragePolicyAdmin.java
index a769ce9..606f675 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/tools/StoragePolicyAdmin.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/tools/StoragePolicyAdmin.java
@@ -272,8 +272,11 @@ public class StoragePolicyAdmin extends Configured 
implements Tool {
               + " the policy in given path. This will print the current"
               + "status of the path in each 10 sec and status are:\n"
               + "PENDING : Path is in queue and not processed for satisfying"
-              + " the policy.\nIN_PROGRESS : Satisfying the storage policy for"
-              + " path.\nSUCCESS : Storage policy satisfied for the path.\n"
+              + " the policy.\n"
+              + "IN_PROGRESS : Satisfying the storage policy for"
+              + " path.\n"
+              + "SUCCESS : Storage policy satisfied for the path.\n"
+              + "FAILURE : Few blocks failed to move.\n"
               + "NOT_AVAILABLE : Status not available.");
       return getShortUsage() + "\n" +
           "Schedule blocks to move based on file/directory policy.\n\n" +
@@ -305,18 +308,30 @@ public class StoragePolicyAdmin extends Configured 
implements Tool {
       return 0;
     }
 
-
     private void waitForSatisfyPolicy(DistributedFileSystem dfs, String path)
         throws IOException {
       System.out.println("Waiting for satisfy the policy ...");
-      while (true) {
+      boolean running = true;
+      while (running) {
         StoragePolicySatisfyPathStatus status = dfs.getClient()
             .checkStoragePolicySatisfyPathStatus(path);
-        if (StoragePolicySatisfyPathStatus.SUCCESS.equals(status)) {
+        switch (status) {
+        case SUCCESS:
+        case FAILURE:
+        case NOT_AVAILABLE:
+          System.out.println(status);
+          running = false;
+          break;
+        case PENDING:
+        case IN_PROGRESS:
           System.out.println(status);
+        default:
+          System.err.println("Unexpected storage policy satisfyer status,"
+              + " Exiting");
+          running = false;
           break;
         }
-        System.out.println(status);
+
         try {
           Thread.sleep(10000);
         } catch (InterruptedException e) {

http://git-wip-us.apache.org/repos/asf/hadoop/blob/04ed6dd1/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml
index 7b9f795..48ec439 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml
@@ -4384,12 +4384,21 @@
 
 <property>
   <name>dfs.storage.policy.satisfier.low.max-streams.preference</name>
-  <value>false</value>
+  <value>true</value>
   <description>
-    If true, blocks to move tasks will share equal ratio of number of 
highest-priority
+    If false, blocks to move tasks will share equal ratio of number of 
highest-priority
     replication streams (dfs.namenode.replication.max-streams) with pending 
replica and
-    erasure-coded reconstruction tasks. If false, blocks to move tasks will 
only use
-    the delta number of replication streams. The default value is false.
+    erasure-coded reconstruction tasks. If true, blocks to move tasks will 
only use
+    the delta number of replication streams. The default value is true.
+  </description>
+</property>
+
+<property>
+  <name>dfs.storage.policy.satisfier.retry.max.attempts</name>
+  <value>3</value>
+  <description>
+    Max retry to satisfy the block storage policy. After this retry block will 
be removed
+    from the movement needed queue.
   </description>
 </property>
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/04ed6dd1/hadoop-hdfs-project/hadoop-hdfs/src/site/markdown/ArchivalStorage.md
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/site/markdown/ArchivalStorage.md 
b/hadoop-hdfs-project/hadoop-hdfs/src/site/markdown/ArchivalStorage.md
index 93fcb1b..eecb264 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/site/markdown/ArchivalStorage.md
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/site/markdown/ArchivalStorage.md
@@ -222,7 +222,7 @@ Schedule blocks to move based on file's/directory's current 
storage policy.
 | | |
 |:---- |:---- |
 | `-path <path>` | The path referring to either a directory or a file. |
-| `-w` | It requests that the command wait till all the files satisfy the 
policy in given path. This will print the current status of the path in each 10 
sec and status are:<br/>PENDING - Path is in queue and not processed for 
satisfying the policy.<br/>IN_PROGRESS - Satisfying the storage policy for 
path.<br/>SUCCESS - Storage policy satisfied for the path.<br/>NOT_AVAILABLE - 
Status not available. |
+| `-w` | It requests that the command wait till all the files satisfy the 
policy in given path. This will print the current status of the path in each 10 
sec and status are:<br/>PENDING - Path is in queue and not processed for 
satisfying the policy.<br/>IN_PROGRESS - Satisfying the storage policy for 
path.<br/>SUCCESS - Storage policy satisfied for the path.<br/>FAILURE : Few 
blocks failed to move.<br/>NOT_AVAILABLE - Status not available. |
 
 ### SPS Running Status
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/04ed6dd1/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestBlockStorageMovementAttemptedItems.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestBlockStorageMovementAttemptedItems.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestBlockStorageMovementAttemptedItems.java
index f79326f..d4ccb3e 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestBlockStorageMovementAttemptedItems.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestBlockStorageMovementAttemptedItems.java
@@ -91,7 +91,7 @@ public class TestBlockStorageMovementAttemptedItems {
     Long item = new Long(1234);
     List<Block> blocks = new ArrayList<Block>();
     blocks.add(new Block(item));
-    bsmAttemptedItems.add(new AttemptedItemInfo(0L, 0L, 0L, blocks));
+    bsmAttemptedItems.add(new AttemptedItemInfo(0L, 0L, 0L, blocks, 0));
     Block[] blockArray = new Block[blocks.size()];
     blocks.toArray(blockArray);
     bsmAttemptedItems.addReportedMovedBlocks(blockArray);
@@ -108,7 +108,7 @@ public class TestBlockStorageMovementAttemptedItems {
     Long item = new Long(1234);
     List<Block> blocks = new ArrayList<>();
     blocks.add(new Block(item));
-    bsmAttemptedItems.add(new AttemptedItemInfo(0L, 0L, 0L, blocks));
+    bsmAttemptedItems.add(new AttemptedItemInfo(0L, 0L, 0L, blocks, 0));
     assertEquals("Shouldn't receive result", 0,
         bsmAttemptedItems.getMovementFinishedBlocksCount());
     assertEquals("Item doesn't exist in the attempted list", 1,
@@ -129,7 +129,7 @@ public class TestBlockStorageMovementAttemptedItems {
     blocks.add(new Block(5678L));
     Long trackID = 0L;
     bsmAttemptedItems
-        .add(new AttemptedItemInfo(trackID, trackID, 0L, blocks));
+        .add(new AttemptedItemInfo(trackID, trackID, 0L, blocks, 0));
     Block[] blksMovementReport = new Block[1];
     blksMovementReport[0] = new Block(item);
     bsmAttemptedItems.addReportedMovedBlocks(blksMovementReport);
@@ -154,7 +154,7 @@ public class TestBlockStorageMovementAttemptedItems {
     List<Block> blocks = new ArrayList<>();
     blocks.add(new Block(item));
     bsmAttemptedItems
-        .add(new AttemptedItemInfo(trackID, trackID, 0L, blocks));
+        .add(new AttemptedItemInfo(trackID, trackID, 0L, blocks, 0));
     Block[] blksMovementReport = new Block[1];
     blksMovementReport[0] = new Block(item);
     bsmAttemptedItems.addReportedMovedBlocks(blksMovementReport);
@@ -182,7 +182,7 @@ public class TestBlockStorageMovementAttemptedItems {
     List<Block> blocks = new ArrayList<>();
     blocks.add(new Block(item));
     bsmAttemptedItems
-        .add(new AttemptedItemInfo(trackID, trackID, 0L, blocks));
+        .add(new AttemptedItemInfo(trackID, trackID, 0L, blocks, 0));
     Block[] blksMovementReport = new Block[1];
     blksMovementReport[0] = new Block(item);
     bsmAttemptedItems.addReportedMovedBlocks(blksMovementReport);

http://git-wip-us.apache.org/repos/asf/hadoop/blob/04ed6dd1/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestStoragePolicySatisfier.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestStoragePolicySatisfier.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestStoragePolicySatisfier.java
index edd1aca..9f733ff 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestStoragePolicySatisfier.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestStoragePolicySatisfier.java
@@ -1412,8 +1412,8 @@ public class TestStoragePolicySatisfier {
           .DFS_STORAGE_POLICY_SATISFIER_RECHECK_TIMEOUT_MILLIS_KEY,
           "3000");
       config.setBoolean(DFSConfigKeys
-          .DFS_STORAGE_POLICY_SATISFIER_SHARE_EQUAL_REPLICA_MAX_STREAMS_KEY,
-          true);
+          .DFS_STORAGE_POLICY_SATISFIER_LOW_MAX_STREAMS_PREFERENCE_KEY,
+          false);
 
       StorageType[][] storagetypes = new StorageType[][] {
           {StorageType.ARCHIVE, StorageType.DISK},
@@ -1474,8 +1474,8 @@ public class TestStoragePolicySatisfier {
           .DFS_STORAGE_POLICY_SATISFIER_RECHECK_TIMEOUT_MILLIS_KEY,
           "3000");
       config.setBoolean(DFSConfigKeys
-          .DFS_STORAGE_POLICY_SATISFIER_SHARE_EQUAL_REPLICA_MAX_STREAMS_KEY,
-          true);
+          .DFS_STORAGE_POLICY_SATISFIER_LOW_MAX_STREAMS_PREFERENCE_KEY,
+          false);
 
       StorageType[][] storagetypes = new StorageType[][] {
           {StorageType.ARCHIVE, StorageType.DISK},
@@ -1531,6 +1531,55 @@ public class TestStoragePolicySatisfier {
     }
   }
 
+  @Test(timeout = 300000)
+  public void testMaxRetryForFailedBlock() throws Exception {
+    try {
+      config.setBoolean(DFSConfigKeys.DFS_STORAGE_POLICY_SATISFIER_ENABLED_KEY,
+          true);
+      config.set(DFSConfigKeys
+          .DFS_STORAGE_POLICY_SATISFIER_RECHECK_TIMEOUT_MILLIS_KEY,
+          "1000");
+      config.set(DFSConfigKeys
+          .DFS_STORAGE_POLICY_SATISFIER_SELF_RETRY_TIMEOUT_MILLIS_KEY,
+          "1000");
+      StorageType[][] storagetypes = new StorageType[][] {
+          {StorageType.DISK, StorageType.DISK},
+          {StorageType.DISK, StorageType.DISK}};
+      hdfsCluster = new MiniDFSCluster.Builder(config).numDataNodes(2)
+          .storageTypes(storagetypes).build();
+      hdfsCluster.waitActive();
+      dfs = hdfsCluster.getFileSystem();
+
+      Path filePath = new Path("/retryFile");
+      DFSTestUtil.createFile(dfs, filePath, DEFAULT_BLOCK_SIZE, (short) 2,
+          0);
+
+      dfs.setStoragePolicy(filePath, "COLD");
+      dfs.satisfyStoragePolicy(filePath);
+      Thread.sleep(3000
+          * DFSConfigKeys
+          .DFS_STORAGE_POLICY_SATISFIER_MAX_RETRY_ATTEMPTS_DEFAULT);
+      DFSTestUtil.waitExpectedStorageType(filePath.toString(),
+          StorageType.DISK, 2, 60000, hdfsCluster.getFileSystem());
+      // Path status should be FAILURE
+      GenericTestUtils.waitFor(new Supplier<Boolean>() {
+        @Override
+        public Boolean get() {
+          try {
+            StoragePolicySatisfyPathStatus status = dfs.getClient()
+                .checkStoragePolicySatisfyPathStatus(filePath.toString());
+            return StoragePolicySatisfyPathStatus.FAILURE.equals(status);
+          } catch (IOException e) {
+            Assert.fail("Fail to get path status for sps");
+          }
+          return false;
+        }
+      }, 100, 90000);
+    } finally {
+      shutdownCluster();
+    }
+  }
+
   private static void createDirectoryTree(DistributedFileSystem dfs)
       throws Exception {
     // tree structure


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org
For additional commands, e-mail: common-commits-h...@hadoop.apache.org

Reply via email to