HDFS-12982 : [SPS]: Reduce the locking and cleanup the Namesystem access. 
Contributed by Rakesh R.


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/113185ee
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/113185ee
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/113185ee

Branch: refs/heads/HDFS-10285
Commit: 113185eec03d6d107ac19d267741c760aaef8f9c
Parents: e36384a
Author: Surendra Singh Lilhore <surendralilh...@apache.org>
Authored: Mon Jan 8 15:13:11 2018 +0530
Committer: Rakesh Radhakrishnan <rake...@apache.org>
Committed: Tue Jul 31 12:10:10 2018 +0530

----------------------------------------------------------------------
 .../server/blockmanagement/BlockManager.java    |  16 +-
 .../blockmanagement/DatanodeDescriptor.java     |   2 +-
 .../server/blockmanagement/DatanodeManager.java |  22 ++
 .../server/namenode/FSDirStatAndListingOp.java  |   1 +
 .../hdfs/server/namenode/FSNamesystem.java      |  44 ++-
 .../hdfs/server/namenode/IntraNNSPSContext.java |  41 --
 .../hadoop/hdfs/server/namenode/Namesystem.java |  24 ++
 .../sps/BlockStorageMovementAttemptedItems.java |  17 +-
 .../sps/BlockStorageMovementNeeded.java         |  48 ++-
 .../hdfs/server/namenode/sps/Context.java       | 181 +++++++++
 .../namenode/sps/IntraSPSNameNodeContext.java   | 220 +++++++++++
 .../namenode/sps/StoragePolicySatisfier.java    | 374 +++++++++----------
 .../TestBlockStorageMovementAttemptedItems.java |  17 +-
 .../sps/TestStoragePolicySatisfier.java         |  25 +-
 14 files changed, 742 insertions(+), 290 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/113185ee/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java
index 1cf687e..c2d5162 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java
@@ -89,11 +89,12 @@ import 
org.apache.hadoop.hdfs.server.common.HdfsServerConstants.ReplicaState;
 import org.apache.hadoop.hdfs.server.namenode.CachedBlock;
 import org.apache.hadoop.hdfs.server.namenode.INode.BlocksMapUpdateInfo;
 import org.apache.hadoop.hdfs.server.namenode.INodesInPath;
-import org.apache.hadoop.hdfs.server.namenode.IntraNNSPSContext;
 import org.apache.hadoop.hdfs.server.namenode.NameNode;
 import org.apache.hadoop.hdfs.server.namenode.Namesystem;
 import org.apache.hadoop.hdfs.server.namenode.ha.HAContext;
 import org.apache.hadoop.hdfs.server.namenode.metrics.NameNodeMetrics;
+import org.apache.hadoop.hdfs.server.namenode.sps.Context;
+import org.apache.hadoop.hdfs.server.namenode.sps.IntraSPSNameNodeContext;
 import org.apache.hadoop.hdfs.server.namenode.sps.StoragePolicySatisfier;
 import org.apache.hadoop.hdfs.server.protocol.BlockCommand;
 import org.apache.hadoop.hdfs.server.protocol.BlockReportContext;
@@ -433,6 +434,7 @@ public class BlockManager implements BlockStatsMXBean {
   private final StoragePolicySatisfier sps;
   private final boolean storagePolicyEnabled;
   private boolean spsEnabled;
+  private Context spsctxt = null;
   /** Minimum live replicas needed for the datanode to be transitioned
    * from ENTERING_MAINTENANCE to IN_MAINTENANCE.
    */
@@ -479,8 +481,8 @@ public class BlockManager implements BlockStatsMXBean {
         conf.getBoolean(
             DFSConfigKeys.DFS_STORAGE_POLICY_SATISFIER_ENABLED_KEY,
             DFSConfigKeys.DFS_STORAGE_POLICY_SATISFIER_ENABLED_DEFAULT);
-    StoragePolicySatisfier.Context spsctxt = new IntraNNSPSContext(namesystem);
-    sps = new StoragePolicySatisfier(namesystem, this, conf, spsctxt);
+    spsctxt = new IntraSPSNameNodeContext(namesystem, this, conf);
+    sps = new StoragePolicySatisfier(spsctxt);
     blockTokenSecretManager = createBlockTokenSecretManager(conf);
 
     providedStorageMap = new ProvidedStorageMap(namesystem, this, conf);
@@ -5031,8 +5033,8 @@ public class BlockManager implements BlockStatsMXBean {
       LOG.info("Storage policy satisfier is already running.");
       return;
     }
-
-    sps.start(false);
+    // TODO: FSDirectory will get removed via HDFS-12911 modularization work
+    sps.start(false, namesystem.getFSDirectory());
   }
 
   /**
@@ -5068,8 +5070,8 @@ public class BlockManager implements BlockStatsMXBean {
       LOG.info("Storage policy satisfier is already running.");
       return;
     }
-
-    sps.start(true);
+    // TODO: FSDirectory will get removed via HDFS-12911 modularization work
+    sps.start(true, namesystem.getFSDirectory());
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/hadoop/blob/113185ee/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeDescriptor.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeDescriptor.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeDescriptor.java
index f9a76b4..b09d908 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeDescriptor.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeDescriptor.java
@@ -802,7 +802,7 @@ public class DatanodeDescriptor extends DatanodeInfo {
   }
 
   /** Increment the number of blocks scheduled. */
-  void incrementBlocksScheduled(StorageType t) {
+  public void incrementBlocksScheduled(StorageType t) {
     currApproxBlocksScheduled.add(t, 1);
   }
   

http://git-wip-us.apache.org/repos/asf/hadoop/blob/113185ee/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java
index 6aab5e9..c24a38b 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java
@@ -35,6 +35,7 @@ import org.apache.hadoop.hdfs.DFSUtil;
 import org.apache.hadoop.hdfs.HdfsConfiguration;
 import org.apache.hadoop.hdfs.net.DFSNetworkTopology;
 import org.apache.hadoop.hdfs.protocol.*;
+import org.apache.hadoop.hdfs.protocol.DatanodeInfo.DatanodeInfoBuilder;
 import org.apache.hadoop.hdfs.protocol.HdfsConstants.DatanodeReportType;
 import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
 import 
org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor.BlockTargetPair;
@@ -2045,5 +2046,26 @@ public class DatanodeManager {
       }
     }
   }
+
+  /**
+   * Generates datanode reports for the given report type.
+   *
+   * @param type
+   *          type of the datanode report
+   * @return array of DatanodeStorageReports
+   */
+  public DatanodeStorageReport[] getDatanodeStorageReport(
+      DatanodeReportType type) {
+    final List<DatanodeDescriptor> datanodes = getDatanodeListForReport(type);
+
+    DatanodeStorageReport[] reports = new DatanodeStorageReport[datanodes
+        .size()];
+    for (int i = 0; i < reports.length; i++) {
+      final DatanodeDescriptor d = datanodes.get(i);
+      reports[i] = new DatanodeStorageReport(
+          new DatanodeInfoBuilder().setFrom(d).build(), d.getStorageReports());
+    }
+    return reports;
+  }
 }
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/113185ee/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirStatAndListingOp.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirStatAndListingOp.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirStatAndListingOp.java
index 7e22ae1..709e270 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirStatAndListingOp.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirStatAndListingOp.java
@@ -90,6 +90,7 @@ class FSDirStatAndListingOp {
    * @param srcArg The string representation of the path to the file
    * @param resolveLink whether to throw UnresolvedLinkException
    *        if src refers to a symlink
+   * @param needLocation if blockLocations need to be returned
    *
    * @param needLocation Include {@link LocatedBlocks} in result.
    * @param needBlockToken Include block tokens in {@link LocatedBlocks}.

http://git-wip-us.apache.org/repos/asf/hadoop/blob/113185ee/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
index 3f6e4b3..25a45c4 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
@@ -3133,6 +3133,29 @@ public class FSNamesystem implements Namesystem, 
FSNamesystemMBean,
    * @param src The string representation of the path to the file
    * @param resolveLink whether to throw UnresolvedLinkException
    *        if src refers to a symlink
+   * @param needLocation if blockLocations need to be returned
+   *
+   * @throws AccessControlException
+   *           if access is denied
+   * @throws UnresolvedLinkException
+   *           if a symlink is encountered.
+   *
+   * @return object containing information regarding the file or null if file
+   *         not found
+   * @throws StandbyException
+   */
+  @Override
+  public HdfsFileStatus getFileInfo(final String src, boolean resolveLink,
+      boolean needLocation) throws IOException {
+    return getFileInfo(src, resolveLink, needLocation, false);
+  }
+
+  /**
+   * Get the file info for a specific file.
+   *
+   * @param src The string representation of the path to the file
+   * @param resolveLink whether to throw UnresolvedLinkException
+   *        if src refers to a symlink
    *
    * @param needLocation Include {@link LocatedBlocks} in result.
    * @param needBlockToken Include block tokens in {@link LocatedBlocks}
@@ -3167,6 +3190,17 @@ public class FSNamesystem implements Namesystem, 
FSNamesystemMBean,
     return stat;
   }
 
+  @Override
+  public String getFilePath(Long inodeId) {
+    readLock();
+    try {
+      INode inode = getFSDirectory().getInode(inodeId);
+      return inode == null ? null : inode.getFullPathName();
+    } finally {
+      readUnlock();
+    }
+  }
+
   /**
    * Returns true if the file is closed
    */
@@ -4459,15 +4493,7 @@ public class FSNamesystem implements Namesystem, 
FSNamesystemMBean,
     try {
       checkOperation(OperationCategory.UNCHECKED);
       final DatanodeManager dm = getBlockManager().getDatanodeManager();      
-      final List<DatanodeDescriptor> datanodes = 
dm.getDatanodeListForReport(type);
-
-      reports = new DatanodeStorageReport[datanodes.size()];
-      for (int i = 0; i < reports.length; i++) {
-        final DatanodeDescriptor d = datanodes.get(i);
-        reports[i] = new DatanodeStorageReport(
-            new DatanodeInfoBuilder().setFrom(d).build(),
-            d.getStorageReports());
-      }
+      reports = dm.getDatanodeStorageReport(type);
     } finally {
       readUnlock("getDatanodeStorageReport");
     }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/113185ee/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/IntraNNSPSContext.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/IntraNNSPSContext.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/IntraNNSPSContext.java
deleted file mode 100644
index 111cabb..0000000
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/IntraNNSPSContext.java
+++ /dev/null
@@ -1,41 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hdfs.server.namenode;
-
-import org.apache.hadoop.hdfs.server.namenode.sps.StoragePolicySatisfier;
-
-/**
- * This class is the Namenode implementation for analyzing the file blocks 
which
- * are expecting to change its storages and assigning the block storage
- * movements to satisfy the storage policy.
- */
-// TODO: Now, added one API which is required for sps package. Will refine
-// this interface via HDFS-12911.
-public class IntraNNSPSContext implements StoragePolicySatisfier.Context {
-  private final Namesystem namesystem;
-
-  public IntraNNSPSContext(Namesystem namesystem) {
-    this.namesystem = namesystem;
-  }
-
-  @Override
-  public int getNumLiveDataNodes() {
-    return namesystem.getFSDirectory().getBlockManager().getDatanodeManager()
-        .getNumLiveDataNodes();
-  }
-}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/113185ee/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/Namesystem.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/Namesystem.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/Namesystem.java
index e58fa72..fc933b7 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/Namesystem.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/Namesystem.java
@@ -20,6 +20,7 @@ package org.apache.hadoop.hdfs.server.namenode;
 import java.io.IOException;
 
 import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
 import org.apache.hadoop.hdfs.server.blockmanagement.BlockCollection;
 import org.apache.hadoop.hdfs.server.namenode.ha.HAContext;
 import org.apache.hadoop.hdfs.util.RwLock;
@@ -62,4 +63,27 @@ public interface Namesystem extends RwLock, SafeMode {
    * @throws IOException
    */
   void removeXattr(long id, String xattrName) throws IOException;
+
+  /**
+   * Gets the fileInfo of the given file path.
+   *
+   * @param filePath string representation of the path to the file
+   * @param resolveLink whether to throw UnresolvedLinkException
+   *        if src refers to a symlink
+   * @param needLocation if blockLocations need to be returned
+   *
+   * @return hdfs file status details
+   * @throws IOException
+   */
+  HdfsFileStatus getFileInfo(String filePath, boolean resolveLink,
+      boolean needLocation) throws IOException;
+
+  /**
+   * Gets the file path corresponds to the given file id.
+   *
+   * @param inodeId
+   *          file id
+   * @return string file path
+   */
+  String getFilePath(Long inodeId);
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/113185ee/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/BlockStorageMovementAttemptedItems.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/BlockStorageMovementAttemptedItems.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/BlockStorageMovementAttemptedItems.java
index b044f30..1cae027 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/BlockStorageMovementAttemptedItems.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/BlockStorageMovementAttemptedItems.java
@@ -25,6 +25,11 @@ import java.util.Arrays;
 import java.util.Iterator;
 import java.util.List;
 
+import static 
org.apache.hadoop.hdfs.DFSConfigKeys.DFS_STORAGE_POLICY_SATISFIER_RECHECK_TIMEOUT_MILLIS_DEFAULT;
+import static 
org.apache.hadoop.hdfs.DFSConfigKeys.DFS_STORAGE_POLICY_SATISFIER_RECHECK_TIMEOUT_MILLIS_KEY;
+import static 
org.apache.hadoop.hdfs.DFSConfigKeys.DFS_STORAGE_POLICY_SATISFIER_SELF_RETRY_TIMEOUT_MILLIS_DEFAULT;
+import static 
org.apache.hadoop.hdfs.DFSConfigKeys.DFS_STORAGE_POLICY_SATISFIER_SELF_RETRY_TIMEOUT_MILLIS_KEY;
+
 import org.apache.hadoop.hdfs.protocol.Block;
 import 
org.apache.hadoop.hdfs.server.namenode.sps.StoragePolicySatisfier.AttemptedItemInfo;
 import 
org.apache.hadoop.hdfs.server.namenode.sps.StoragePolicySatisfier.ItemInfo;
@@ -66,15 +71,21 @@ public class BlockStorageMovementAttemptedItems {
   //
   private long minCheckTimeout = 1 * 60 * 1000; // minimum value
   private BlockStorageMovementNeeded blockStorageMovementNeeded;
+  private final Context ctxt;
 
-  public BlockStorageMovementAttemptedItems(long recheckTimeout,
-      long selfRetryTimeout,
+  public BlockStorageMovementAttemptedItems(Context context,
       BlockStorageMovementNeeded unsatisfiedStorageMovementFiles) {
+    this.ctxt = context;
+    long recheckTimeout = ctxt.getConf().getLong(
+        DFS_STORAGE_POLICY_SATISFIER_RECHECK_TIMEOUT_MILLIS_KEY,
+        DFS_STORAGE_POLICY_SATISFIER_RECHECK_TIMEOUT_MILLIS_DEFAULT);
     if (recheckTimeout > 0) {
       this.minCheckTimeout = Math.min(minCheckTimeout, recheckTimeout);
     }
 
-    this.selfRetryTimeout = selfRetryTimeout;
+    this.selfRetryTimeout = ctxt.getConf().getLong(
+        DFS_STORAGE_POLICY_SATISFIER_SELF_RETRY_TIMEOUT_MILLIS_KEY,
+        DFS_STORAGE_POLICY_SATISFIER_SELF_RETRY_TIMEOUT_MILLIS_DEFAULT);
     this.blockStorageMovementNeeded = unsatisfiedStorageMovementFiles;
     storageMovementAttemptedItems = new ArrayList<>();
     movementFinishedBlocks = new ArrayList<>();

http://git-wip-us.apache.org/repos/asf/hadoop/blob/113185ee/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/BlockStorageMovementNeeded.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/BlockStorageMovementNeeded.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/BlockStorageMovementNeeded.java
index 5635621..80f1893 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/BlockStorageMovementNeeded.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/BlockStorageMovementNeeded.java
@@ -17,7 +17,8 @@
  */
 package org.apache.hadoop.hdfs.server.namenode.sps;
 
-import static 
org.apache.hadoop.hdfs.server.common.HdfsServerConstants.XATTR_SATISFY_STORAGE_POLICY;
+import static 
org.apache.hadoop.hdfs.DFSConfigKeys.DFS_STORAGE_POLICY_SATISFIER_QUEUE_LIMIT_DEFAULT;
+import static 
org.apache.hadoop.hdfs.DFSConfigKeys.DFS_STORAGE_POLICY_SATISFIER_QUEUE_LIMIT_KEY;
 
 import java.io.IOException;
 import java.util.ArrayList;
@@ -35,10 +36,9 @@ import 
org.apache.hadoop.hdfs.protocol.HdfsConstants.StoragePolicySatisfyPathSta
 import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
 import org.apache.hadoop.hdfs.server.namenode.FSDirectory;
 import org.apache.hadoop.hdfs.server.namenode.FSTreeTraverser;
+import org.apache.hadoop.hdfs.server.namenode.FSTreeTraverser.TraverseInfo;
 import org.apache.hadoop.hdfs.server.namenode.INode;
-import org.apache.hadoop.hdfs.server.namenode.Namesystem;
 import 
org.apache.hadoop.hdfs.server.namenode.sps.StoragePolicySatisfier.ItemInfo;
-import org.apache.hadoop.hdfs.server.namenode.FSTreeTraverser.TraverseInfo;
 import org.apache.hadoop.util.Daemon;
 import org.apache.hadoop.util.Time;
 import org.slf4j.Logger;
@@ -73,13 +73,11 @@ public class BlockStorageMovementNeeded {
   private final Map<Long, StoragePolicySatisfyPathStatusInfo> spsStatus =
       new ConcurrentHashMap<>();
 
-  private final Namesystem namesystem;
+  private final Context ctxt;
 
   // List of pending dir to satisfy the policy
   private final Queue<Long> spsDirsToBeTraveresed = new LinkedList<Long>();
 
-  private final StoragePolicySatisfier sps;
-
   private Daemon inodeIdCollector;
 
   private final int maxQueuedItem;
@@ -88,11 +86,11 @@ public class BlockStorageMovementNeeded {
   // NOT_AVAILABLE.
   private static long statusClearanceElapsedTimeMs = 300000;
 
-  public BlockStorageMovementNeeded(Namesystem namesystem,
-      StoragePolicySatisfier sps, int queueLimit) {
-    this.namesystem = namesystem;
-    this.sps = sps;
-    this.maxQueuedItem = queueLimit;
+  public BlockStorageMovementNeeded(Context context) {
+    this.ctxt = context;
+    this.maxQueuedItem = ctxt.getConf().getInt(
+                  DFS_STORAGE_POLICY_SATISFIER_QUEUE_LIMIT_KEY,
+                  DFS_STORAGE_POLICY_SATISFIER_QUEUE_LIMIT_DEFAULT);
   }
 
   /**
@@ -188,8 +186,7 @@ public class BlockStorageMovementNeeded {
       // If track is part of some start inode then reduce the pending
       // directory work count.
       long startId = trackInfo.getStartId();
-      INode inode = namesystem.getFSDirectory().getInode(startId);
-      if (inode == null) {
+      if (!ctxt.isFileExist(startId)) {
         // directory deleted just remove it.
         this.pendingWorkForDirectory.remove(startId);
         updateStatus(startId, isSuccess);
@@ -198,7 +195,7 @@ public class BlockStorageMovementNeeded {
         if (pendingWork != null) {
           pendingWork.decrementPendingWorkCount();
           if (pendingWork.isDirWorkDone()) {
-            namesystem.removeXattr(startId, XATTR_SATISFY_STORAGE_POLICY);
+            ctxt.removeSPSHint(startId);
             pendingWorkForDirectory.remove(startId);
             pendingWork.setFailure(!isSuccess);
             updateStatus(startId, pendingWork.isPolicySatisfied());
@@ -209,8 +206,7 @@ public class BlockStorageMovementNeeded {
     } else {
       // Remove xAttr if trackID doesn't exist in
       // storageMovementAttemptedItems or file policy satisfied.
-      namesystem.removeXattr(trackInfo.getTrackId(),
-          XATTR_SATISFY_STORAGE_POLICY);
+      ctxt.removeSPSHint(trackInfo.getTrackId());
       updateStatus(trackInfo.getStartId(), isSuccess);
     }
   }
@@ -256,7 +252,7 @@ public class BlockStorageMovementNeeded {
     while ((trackId = spsDirsToBeTraveresed.poll()) != null) {
       try {
         // Remove xAttr for file
-        namesystem.removeXattr(trackId, XATTR_SATISFY_STORAGE_POLICY);
+        ctxt.removeSPSHint(trackId);
       } catch (IOException ie) {
         LOG.warn("Failed to remove SPS xattr for track id " + trackId, ie);
       }
@@ -269,8 +265,7 @@ public class BlockStorageMovementNeeded {
       try {
         // Remove xAttr for file
         if (!itemInfo.isDir()) {
-          namesystem.removeXattr(itemInfo.getTrackId(),
-              XATTR_SATISFY_STORAGE_POLICY);
+          ctxt.removeSPSHint(itemInfo.getTrackId());
         }
       } catch (IOException ie) {
         LOG.warn(
@@ -300,10 +295,9 @@ public class BlockStorageMovementNeeded {
     public void run() {
       LOG.info("Starting FileInodeIdCollector!.");
       long lastStatusCleanTime = 0;
-      while (namesystem.isRunning() && sps.isRunning()) {
+      while (ctxt.isRunning()) {
         try {
-          if (!namesystem.isInSafeMode()) {
-            FSDirectory fsd = namesystem.getFSDirectory();
+          if (!ctxt.isInSafeMode()) {
             Long startINodeId = spsDirsToBeTraveresed.poll();
             if (startINodeId == null) {
               // Waiting for SPS path
@@ -311,7 +305,7 @@ public class BlockStorageMovementNeeded {
                 spsDirsToBeTraveresed.wait(5000);
               }
             } else {
-              INode startInode = fsd.getInode(startINodeId);
+              INode startInode = getFSDirectory().getInode(startINodeId);
               if (startInode != null) {
                 try {
                   remainingCapacity = remainingCapacity();
@@ -333,8 +327,7 @@ public class BlockStorageMovementNeeded {
                 DirPendingWorkInfo dirPendingWorkInfo =
                     pendingWorkForDirectory.get(startInode.getId());
                 if (dirPendingWorkInfo.isDirWorkDone()) {
-                  namesystem.removeXattr(startInode.getId(),
-                      XATTR_SATISFY_STORAGE_POLICY);
+                  ctxt.removeSPSHint(startInode.getId());
                   pendingWorkForDirectory.remove(startInode.getId());
                   updateStatus(startInode.getId(), true);
                 }
@@ -483,9 +476,10 @@ public class BlockStorageMovementNeeded {
     }
   }
 
-  public void init() {
+  // TODO: FSDirectory will get removed via HDFS-12911 modularization work
+  public void init(FSDirectory fsd) {
     inodeIdCollector = new Daemon(new StorageMovementPendingInodeIdCollector(
-        namesystem.getFSDirectory()));
+        fsd));
     inodeIdCollector.setName("FileInodeIdCollector");
     inodeIdCollector.start();
   }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/113185ee/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/Context.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/Context.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/Context.java
new file mode 100644
index 0000000..d11e26f
--- /dev/null
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/Context.java
@@ -0,0 +1,181 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hdfs.server.namenode.sps;
+
+import java.io.IOException;
+import java.util.function.Supplier;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.ParentNotDirectoryException;
+import org.apache.hadoop.fs.StorageType;
+import org.apache.hadoop.fs.UnresolvedLinkException;
+import org.apache.hadoop.hdfs.protocol.BlockStoragePolicy;
+import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
+import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
+import org.apache.hadoop.hdfs.server.protocol.DatanodeStorageReport;
+import 
org.apache.hadoop.hdfs.server.protocol.BlockStorageMovementCommand.BlockMovingInfo;
+import org.apache.hadoop.net.NetworkTopology;
+import org.apache.hadoop.security.AccessControlException;
+
+/**
+ * An interface for the communication between NameNode and SPS module.
+ */
+@InterfaceAudience.Private
+@InterfaceStability.Evolving
+public interface Context {
+
+  /**
+   * Returns configuration object.
+   */
+  Configuration getConf();
+
+  /**
+   * Returns true if the SPS is running, false otherwise.
+   */
+  boolean isRunning();
+
+  /**
+   * Update the SPS running status.
+   *
+   * @param isSpsRunning
+   *          true represents running, false otherwise
+   */
+  void setSPSRunning(Supplier<Boolean> isSpsRunning);
+
+  /**
+   * Returns true if the Namenode in safe mode, false otherwise.
+   */
+  boolean isInSafeMode();
+
+  /**
+   * Returns true if Mover tool is already running, false otherwise.
+   */
+  boolean isMoverRunning();
+
+  /**
+   * Gets the Inode ID number for the given path.
+   *
+   * @param path
+   *          - file/dir path
+   * @return Inode id number
+   */
+  long getFileID(String path) throws UnresolvedLinkException,
+      AccessControlException, ParentNotDirectoryException;
+
+  /**
+   * Gets the network topology.
+   *
+   * @return network topology
+   */
+  NetworkTopology getNetworkTopology();
+
+  /**
+   * Returns true if the give Inode exists in the Namespace.
+   *
+   * @param inodeId
+   *          - Inode ID
+   * @return true if Inode exists, false otherwise.
+   */
+  boolean isFileExist(long inodeId);
+
+  /**
+   * Gets the storage policy details for the given policy ID.
+   *
+   * @param policyId
+   *          - Storage policy ID
+   * @return the detailed policy object
+   */
+  BlockStoragePolicy getStoragePolicy(byte policyId);
+
+  /**
+   * Drop the SPS work in case if any previous work queued up.
+   */
+  void addDropPreviousSPSWorkAtDNs();
+
+  /**
+   * Remove the hint which was added to track SPS call.
+   *
+   * @param inodeId
+   *          - Inode ID
+   * @throws IOException
+   */
+  void removeSPSHint(long inodeId) throws IOException;
+
+  /**
+   * Gets the number of live datanodes in the cluster.
+   *
+   * @return number of live datanodes
+   */
+  int getNumLiveDataNodes();
+
+  /**
+   * Get the file info for a specific file.
+   *
+   * @param inodeID
+   *          inode identifier
+   * @return file status metadata information
+   */
+  HdfsFileStatus getFileInfo(long inodeID) throws IOException;
+
+  /**
+   * Returns all the live datanodes and its storage details.
+   *
+   * @throws IOException
+   */
+  DatanodeStorageReport[] getLiveDatanodeStorageReport()
+      throws IOException;
+
+  /**
+   * Returns true if the given inode file has low redundancy blocks.
+   *
+   * @param inodeID
+   *          inode identifier
+   * @return true if block collection has low redundancy blocks
+   */
+  boolean hasLowRedundancyBlocks(long inodeID);
+
+  /**
+   * Assign the given block movement task to the target node present in
+   * {@link BlockMovingInfo}.
+   *
+   * @param blkMovingInfo
+   *          block to storage info
+   * @throws IOException
+   */
+  void assignBlockMoveTaskToTargetNode(BlockMovingInfo blkMovingInfo)
+      throws IOException;
+
+  /**
+   * Checks whether the given datanode has sufficient space to occupy the given
+   * blockSize data.
+   *
+   * @param dn
+   *          datanode info
+   * @param type
+   *          storage type
+   * @param blockSize
+   *          blockSize to be scheduled
+   * @return true if the given datanode has sufficient space to occupy 
blockSize
+   *         data, false otherwise.
+   */
+  boolean verifyTargetDatanodeHasSpaceForScheduling(DatanodeInfo dn,
+      StorageType type, long blockSize);
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/113185ee/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/IntraSPSNameNodeContext.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/IntraSPSNameNodeContext.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/IntraSPSNameNodeContext.java
new file mode 100644
index 0000000..6654212
--- /dev/null
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/IntraSPSNameNodeContext.java
@@ -0,0 +1,220 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.namenode.sps;
+
+import static 
org.apache.hadoop.hdfs.server.common.HdfsServerConstants.XATTR_SATISFY_STORAGE_POLICY;
+
+import java.io.IOException;
+import java.util.function.Supplier;
+
+import org.apache.commons.lang.StringUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.ParentNotDirectoryException;
+import org.apache.hadoop.fs.StorageType;
+import org.apache.hadoop.fs.UnresolvedLinkException;
+import org.apache.hadoop.hdfs.protocol.BlockStoragePolicy;
+import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
+import org.apache.hadoop.hdfs.protocol.HdfsConstants.DatanodeReportType;
+import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
+import org.apache.hadoop.hdfs.server.blockmanagement.BlockCollection;
+import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager;
+import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor;
+import org.apache.hadoop.hdfs.server.common.HdfsServerConstants;
+import org.apache.hadoop.hdfs.server.namenode.INode;
+import org.apache.hadoop.hdfs.server.namenode.Namesystem;
+import org.apache.hadoop.hdfs.server.protocol.DatanodeStorageReport;
+import 
org.apache.hadoop.hdfs.server.protocol.BlockStorageMovementCommand.BlockMovingInfo;
+import org.apache.hadoop.net.NetworkTopology;
+import org.apache.hadoop.security.AccessControlException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * This class is the Namenode implementation for analyzing the file blocks 
which
+ * are expecting to change its storages and assigning the block storage
+ * movements to satisfy the storage policy.
+ */
+public class IntraSPSNameNodeContext implements Context {
+  private static final Logger LOG = LoggerFactory
+      .getLogger(IntraSPSNameNodeContext.class);
+
+  private final Namesystem namesystem;
+  private final BlockManager blockManager;
+  private final Configuration conf;
+  private Supplier<Boolean> isSpsRunning;
+
+  public IntraSPSNameNodeContext(Namesystem namesystem,
+      BlockManager blockManager, Configuration conf) {
+    this.namesystem = namesystem;
+    this.blockManager = blockManager;
+    this.conf = conf;
+    isSpsRunning = () -> false;
+  }
+
+  @Override
+  public int getNumLiveDataNodes() {
+    return blockManager.getDatanodeManager().getNumLiveDataNodes();
+  }
+
+  @Override
+  public HdfsFileStatus getFileInfo(long inodeID) throws IOException {
+    String filePath = namesystem.getFilePath(inodeID);
+    if (StringUtils.isBlank(filePath)) {
+      LOG.debug("File with inodeID:{} doesn't exists!", inodeID);
+      return null;
+    }
+    HdfsFileStatus fileInfo = null;
+    try {
+      fileInfo = namesystem.getFileInfo(filePath, true, true);
+    } catch (IOException e) {
+      LOG.debug("File path:{} doesn't exists!", filePath);
+    }
+    return fileInfo;
+  }
+
+  @Override
+  public DatanodeStorageReport[] getLiveDatanodeStorageReport()
+      throws IOException {
+    namesystem.readLock();
+    try {
+      return blockManager.getDatanodeManager()
+          .getDatanodeStorageReport(DatanodeReportType.LIVE);
+    } finally {
+      namesystem.readUnlock();
+    }
+  }
+
+  @Override
+  public boolean hasLowRedundancyBlocks(long inodeID) {
+    namesystem.readLock();
+    try {
+      BlockCollection bc = namesystem.getBlockCollection(inodeID);
+      return blockManager.hasLowRedundancyBlocks(bc);
+    } finally {
+      namesystem.readUnlock();
+    }
+  }
+
+  @Override
+  public Configuration getConf() {
+    return conf;
+  }
+
+  @Override
+  public boolean isFileExist(long inodeId) {
+    return namesystem.getFSDirectory().getInode(inodeId) != null;
+  }
+
+  @Override
+  public void removeSPSHint(long inodeId) throws IOException {
+    this.namesystem.removeXattr(inodeId, XATTR_SATISFY_STORAGE_POLICY);
+  }
+
+  @Override
+  public boolean isRunning() {
+    // TODO : 'isSpsRunning' flag has been added to avoid the NN lock inside
+    // SPS. Context interface will be further refined as part of HDFS-12911
+    // modularization task. One idea is to introduce a cleaner interface 
similar
+    // to Namesystem for better abstraction.
+    return namesystem.isRunning() && isSpsRunning.get();
+  }
+
+  @Override
+  public void setSPSRunning(Supplier<Boolean> spsRunningFlag) {
+    this.isSpsRunning = spsRunningFlag;
+  }
+
+  @Override
+  public boolean isInSafeMode() {
+    return namesystem.isInSafeMode();
+  }
+
+  @Override
+  public boolean isMoverRunning() {
+    String moverId = HdfsServerConstants.MOVER_ID_PATH.toString();
+    return namesystem.isFileOpenedForWrite(moverId);
+  }
+
+  @Override
+  public void addDropPreviousSPSWorkAtDNs() {
+    namesystem.readLock();
+    try {
+      blockManager.getDatanodeManager().addDropSPSWorkCommandsToAllDNs();
+    } finally {
+      namesystem.readUnlock();
+    }
+  }
+
+  @Override
+  public BlockStoragePolicy getStoragePolicy(byte policyID) {
+    return blockManager.getStoragePolicy(policyID);
+  }
+
+  @Override
+  public NetworkTopology getNetworkTopology() {
+    return blockManager.getDatanodeManager().getNetworkTopology();
+  }
+
+  @Override
+  public long getFileID(String path) throws UnresolvedLinkException,
+      AccessControlException, ParentNotDirectoryException {
+    namesystem.readLock();
+    try {
+      INode inode = namesystem.getFSDirectory().getINode(path);
+      return inode == null ? -1 : inode.getId();
+    } finally {
+      namesystem.readUnlock();
+    }
+  }
+
+  @Override
+  public void assignBlockMoveTaskToTargetNode(BlockMovingInfo blkMovingInfo)
+      throws IOException {
+    namesystem.readLock();
+    try {
+      DatanodeDescriptor dn = blockManager.getDatanodeManager()
+          .getDatanode(blkMovingInfo.getTarget().getDatanodeUuid());
+      if (dn == null) {
+        throw new IOException("Failed to schedule block movement task:"
+            + blkMovingInfo + " as target datanode: "
+            + blkMovingInfo.getTarget() + " doesn't exists");
+      }
+      dn.addBlocksToMoveStorage(blkMovingInfo);
+      dn.incrementBlocksScheduled(blkMovingInfo.getTargetStorageType());
+    } finally {
+      namesystem.readUnlock();
+    }
+  }
+
+  @Override
+  public boolean verifyTargetDatanodeHasSpaceForScheduling(DatanodeInfo dn,
+      StorageType type, long blockSize) {
+    namesystem.readLock();
+    try {
+      DatanodeDescriptor datanode = blockManager.getDatanodeManager()
+          .getDatanode(dn.getDatanodeUuid());
+      if (datanode == null) {
+        LOG.debug("Target datanode: " + dn + " doesn't exists");
+        return false;
+      }
+      return null != datanode.chooseStorage4Block(type, blockSize);
+    } finally {
+      namesystem.readUnlock();
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/113185ee/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/StoragePolicySatisfier.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/StoragePolicySatisfier.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/StoragePolicySatisfier.java
index 0d4bb19..b3e6b78 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/StoragePolicySatisfier.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/StoragePolicySatisfier.java
@@ -29,29 +29,28 @@ import java.util.LinkedList;
 import java.util.List;
 
 import org.apache.hadoop.classification.InterfaceAudience;
-import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.StorageType;
 import org.apache.hadoop.hdfs.DFSConfigKeys;
 import org.apache.hadoop.hdfs.DFSUtil;
 import org.apache.hadoop.hdfs.protocol.Block;
 import org.apache.hadoop.hdfs.protocol.BlockStoragePolicy;
 import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
-import org.apache.hadoop.hdfs.protocol.HdfsConstants.DatanodeReportType;
+import org.apache.hadoop.hdfs.protocol.DatanodeInfoWithStorage;
+import org.apache.hadoop.hdfs.protocol.ErasureCodingPolicy;
+import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
 import 
org.apache.hadoop.hdfs.protocol.HdfsConstants.StoragePolicySatisfyPathStatus;
+import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
+import org.apache.hadoop.hdfs.protocol.HdfsLocatedFileStatus;
+import org.apache.hadoop.hdfs.protocol.LocatedBlock;
+import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
+import org.apache.hadoop.hdfs.protocol.LocatedStripedBlock;
 import org.apache.hadoop.hdfs.server.balancer.Matcher;
-import org.apache.hadoop.hdfs.server.blockmanagement.BlockCollection;
-import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfo;
-import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfoStriped;
-import 
org.apache.hadoop.hdfs.server.blockmanagement.BlockInfoStriped.StorageAndBlockIndex;
-import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager;
-import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor;
-import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeStorageInfo;
 import org.apache.hadoop.hdfs.server.common.HdfsServerConstants;
 import org.apache.hadoop.hdfs.server.namenode.ErasureCodingPolicyManager;
-import org.apache.hadoop.hdfs.server.namenode.INode;
-import org.apache.hadoop.hdfs.server.namenode.Namesystem;
+import org.apache.hadoop.hdfs.server.namenode.FSDirectory;
 import 
org.apache.hadoop.hdfs.server.protocol.BlockStorageMovementCommand.BlockMovingInfo;
 import org.apache.hadoop.hdfs.server.protocol.BlocksStorageMoveAttemptFinished;
+import org.apache.hadoop.hdfs.server.protocol.DatanodeStorageReport;
 import org.apache.hadoop.hdfs.server.protocol.StorageReport;
 import org.apache.hadoop.hdfs.util.StripedBlockUtil;
 import org.apache.hadoop.util.Daemon;
@@ -79,8 +78,6 @@ public class StoragePolicySatisfier implements Runnable {
   public static final Logger LOG =
       LoggerFactory.getLogger(StoragePolicySatisfier.class);
   private Daemon storagePolicySatisfierThread;
-  private final Namesystem namesystem;
-  private final BlockManager blockManager;
   private final BlockStorageMovementNeeded storageMovementNeeded;
   private final BlockStorageMovementAttemptedItems storageMovementsMonitor;
   private volatile boolean isRunning = false;
@@ -90,16 +87,6 @@ public class StoragePolicySatisfier implements Runnable {
   private final Context ctxt;
 
   /**
-   * An interface for analyzing and assigning the block storage movements to
-   * worker nodes.
-   */
-  // TODO: Now, added one API which is required for sps package. Will refine
-  // this interface via HDFS-12911.
-  public interface Context {
-    int getNumLiveDataNodes();
-  }
-
-  /**
    * Represents the collective analysis status for all blocks.
    */
   private static class BlocksMovingAnalysis {
@@ -124,7 +111,9 @@ public class StoragePolicySatisfier implements Runnable {
       BLOCKS_TARGET_PAIRING_SKIPPED,
       // Represents that, All the reported blocks are satisfied the policy but
       // some of the blocks are low redundant.
-      FEW_LOW_REDUNDANCY_BLOCKS
+      FEW_LOW_REDUNDANCY_BLOCKS,
+      // Represents that, movement failures due to unexpected errors.
+      BLOCKS_FAILED_TO_MOVE
     }
 
     private Status status = null;
@@ -136,36 +125,27 @@ public class StoragePolicySatisfier implements Runnable {
     }
   }
 
-  public StoragePolicySatisfier(final Namesystem namesystem,
-      final BlockManager blkManager, Configuration conf, Context ctxt) {
-    this.namesystem = namesystem;
-    this.storageMovementNeeded = new BlockStorageMovementNeeded(namesystem,
-        this, conf.getInt(
-            DFSConfigKeys.DFS_STORAGE_POLICY_SATISFIER_QUEUE_LIMIT_KEY,
-            DFSConfigKeys.DFS_STORAGE_POLICY_SATISFIER_QUEUE_LIMIT_DEFAULT));
-    this.blockManager = blkManager;
-    this.storageMovementsMonitor = new BlockStorageMovementAttemptedItems(
-        conf.getLong(
-            
DFSConfigKeys.DFS_STORAGE_POLICY_SATISFIER_RECHECK_TIMEOUT_MILLIS_KEY,
-            
DFSConfigKeys.DFS_STORAGE_POLICY_SATISFIER_RECHECK_TIMEOUT_MILLIS_DEFAULT),
-        conf.getLong(
-            
DFSConfigKeys.DFS_STORAGE_POLICY_SATISFIER_SELF_RETRY_TIMEOUT_MILLIS_KEY,
-            
DFSConfigKeys.DFS_STORAGE_POLICY_SATISFIER_SELF_RETRY_TIMEOUT_MILLIS_DEFAULT),
+  public StoragePolicySatisfier(Context ctxt) {
+    this.ctxt = ctxt;
+    this.storageMovementNeeded = new BlockStorageMovementNeeded(ctxt);
+    this.storageMovementsMonitor = new BlockStorageMovementAttemptedItems(ctxt,
         storageMovementNeeded);
-    this.spsWorkMultiplier = DFSUtil.getSPSWorkMultiplier(conf);
-    this.blockMovementMaxRetry = conf.getInt(
+    this.spsWorkMultiplier = DFSUtil.getSPSWorkMultiplier(ctxt.getConf());
+    this.blockMovementMaxRetry = ctxt.getConf().getInt(
         DFSConfigKeys.DFS_STORAGE_POLICY_SATISFIER_MAX_RETRY_ATTEMPTS_KEY,
         DFSConfigKeys.DFS_STORAGE_POLICY_SATISFIER_MAX_RETRY_ATTEMPTS_DEFAULT);
-    this.ctxt = ctxt;
   }
 
   /**
    * Start storage policy satisfier demon thread. Also start block storage
    * movements monitor for retry the attempts if needed.
+   *
+   * // TODO: FSDirectory will get removed via HDFS-12911 modularization work.
    */
-  public synchronized void start(boolean reconfigStart) {
+  public synchronized void start(boolean reconfigStart, FSDirectory fsd) {
     isRunning = true;
-    if (checkIfMoverRunning()) {
+    ctxt.setSPSRunning(this::isRunning);
+    if (ctxt.isMoverRunning()) {
       isRunning = false;
       LOG.error(
           "Stopping StoragePolicySatisfier thread " + "as Mover ID file "
@@ -183,7 +163,7 @@ public class StoragePolicySatisfier implements Runnable {
     // Ensure that all the previously submitted block movements(if any) have to
     // be stopped in all datanodes.
     addDropSPSWorkCommandsToAllDNs();
-    storageMovementNeeded.init();
+    storageMovementNeeded.init(fsd);
     storagePolicySatisfierThread = new Daemon(this);
     storagePolicySatisfierThread.setName("StoragePolicySatisfier");
     storagePolicySatisfierThread.start();
@@ -199,7 +179,6 @@ public class StoragePolicySatisfier implements Runnable {
    */
   public synchronized void disable(boolean forceStop) {
     isRunning = false;
-
     if (storagePolicySatisfierThread == null) {
       return;
     }
@@ -242,25 +221,19 @@ public class StoragePolicySatisfier implements Runnable {
     return isRunning;
   }
 
-  // Return true if a Mover instance is running
-  private boolean checkIfMoverRunning() {
-    String moverId = HdfsServerConstants.MOVER_ID_PATH.toString();
-    return namesystem.isFileOpenedForWrite(moverId);
-  }
-
   /**
    * Adding drop commands to all datanodes to stop performing the satisfier
    * block movements, if any.
    */
   private void addDropSPSWorkCommandsToAllDNs() {
-    this.blockManager.getDatanodeManager().addDropSPSWorkCommandsToAllDNs();
+    ctxt.addDropPreviousSPSWorkAtDNs();
   }
 
   @Override
   public void run() {
-    while (namesystem.isRunning() && isRunning) {
+    while (ctxt.isRunning()) {
       try {
-        if (!namesystem.isInSafeMode()) {
+        if (!ctxt.isInSafeMode()) {
           ItemInfo itemInfo = storageMovementNeeded.get();
           if (itemInfo != null) {
             if(itemInfo.getRetryCount() >= blockMovementMaxRetry){
@@ -271,25 +244,28 @@ public class StoragePolicySatisfier implements Runnable {
               continue;
             }
             long trackId = itemInfo.getTrackId();
-            BlockCollection blockCollection;
             BlocksMovingAnalysis status = null;
-            try {
-              namesystem.readLock();
-              blockCollection = namesystem.getBlockCollection(trackId);
-              // Check blockCollectionId existence.
-              if (blockCollection == null) {
-                // File doesn't exists (maybe got deleted), remove trackId from
-                // the queue
-                storageMovementNeeded.removeItemTrackInfo(itemInfo, true);
-              } else {
-                status =
-                    analyseBlocksStorageMovementsAndAssignToDN(
-                        blockCollection);
-              }
-            } finally {
-              namesystem.readUnlock();
-            }
-            if (blockCollection != null) {
+            DatanodeStorageReport[] liveDnReports;
+            BlockStoragePolicy existingStoragePolicy;
+            // TODO: presently, context internally acquire the lock
+            // and returns the result. Need to discuss to move the lock 
outside?
+            boolean hasLowRedundancyBlocks = ctxt
+                .hasLowRedundancyBlocks(trackId);
+            HdfsFileStatus fileStatus = ctxt.getFileInfo(trackId);
+            // Check path existence.
+            if (fileStatus == null || fileStatus.isDir()) {
+              // File doesn't exists (maybe got deleted) or its a directory,
+              // just remove trackId from the queue
+              storageMovementNeeded.removeItemTrackInfo(itemInfo, true);
+            } else {
+              liveDnReports = ctxt.getLiveDatanodeStorageReport();
+              byte existingStoragePolicyID = fileStatus.getStoragePolicy();
+              existingStoragePolicy = ctxt
+                  .getStoragePolicy(existingStoragePolicyID);
+
+              HdfsLocatedFileStatus file = (HdfsLocatedFileStatus) fileStatus;
+              status = analyseBlocksStorageMovementsAndAssignToDN(file,
+                  hasLowRedundancyBlocks, existingStoragePolicy, 
liveDnReports);
               switch (status.status) {
               // Just add to monitor, so it will be retried after timeout
               case ANALYSIS_SKIPPED_FOR_RETRY:
@@ -317,6 +293,14 @@ public class StoragePolicySatisfier implements Runnable {
                 }
                 this.storageMovementNeeded.add(itemInfo);
                 break;
+              case BLOCKS_FAILED_TO_MOVE:
+                if (LOG.isDebugEnabled()) {
+                  LOG.debug("Adding trackID " + trackId
+                      + " back to retry queue as some of the blocks"
+                      + " movement failed.");
+                }
+                this.storageMovementNeeded.add(itemInfo);
+                break;
               // Just clean Xattrs
               case BLOCKS_TARGET_PAIRING_SKIPPED:
               case BLOCKS_ALREADY_SATISFIED:
@@ -350,14 +334,11 @@ public class StoragePolicySatisfier implements Runnable {
           // Stopping monitor thread and clearing queues as well
           this.clearQueues();
           this.storageMovementsMonitor.stopGracefully();
-          if (!namesystem.isRunning()) {
-            LOG.info("Stopping StoragePolicySatisfier.");
-            if (!(t instanceof InterruptedException)) {
-              LOG.info("StoragePolicySatisfier received an exception"
-                  + " while shutting down.", t);
-            }
-            return;
+          if (!(t instanceof InterruptedException)) {
+            LOG.info("StoragePolicySatisfier received an exception"
+                + " while shutting down.", t);
           }
+          LOG.info("Stopping StoragePolicySatisfier.");
         }
       }
     }
@@ -367,41 +348,43 @@ public class StoragePolicySatisfier implements Runnable {
   }
 
   private BlocksMovingAnalysis analyseBlocksStorageMovementsAndAssignToDN(
-      BlockCollection blockCollection) {
+      HdfsLocatedFileStatus fileInfo, boolean hasLowRedundancyBlocks,
+      BlockStoragePolicy existingStoragePolicy,
+      DatanodeStorageReport[] liveDns) {
     BlocksMovingAnalysis.Status status =
         BlocksMovingAnalysis.Status.BLOCKS_ALREADY_SATISFIED;
-    byte existingStoragePolicyID = blockCollection.getStoragePolicyID();
-    BlockStoragePolicy existingStoragePolicy =
-        blockManager.getStoragePolicy(existingStoragePolicyID);
-    if (!blockCollection.getLastBlock().isComplete()) {
+    final ErasureCodingPolicy ecPolicy = fileInfo.getErasureCodingPolicy();
+    final LocatedBlocks locatedBlocks = fileInfo.getLocatedBlocks();
+    final boolean lastBlkComplete = locatedBlocks.isLastBlockComplete();
+    if (!lastBlkComplete) {
       // Postpone, currently file is under construction
       // So, should we add back? or leave it to user
       LOG.info("BlockCollectionID: {} file is under construction. So, postpone"
-          + " this to the next retry iteration", blockCollection.getId());
+          + " this to the next retry iteration", fileInfo.getFileId());
       return new BlocksMovingAnalysis(
           BlocksMovingAnalysis.Status.ANALYSIS_SKIPPED_FOR_RETRY,
           new ArrayList<>());
     }
 
-    BlockInfo[] blocks = blockCollection.getBlocks();
-    if (blocks.length == 0) {
+    List<LocatedBlock> blocks = locatedBlocks.getLocatedBlocks();
+    if (blocks.size() == 0) {
       LOG.info("BlockCollectionID: {} file is not having any blocks."
-          + " So, skipping the analysis.", blockCollection.getId());
+          + " So, skipping the analysis.", fileInfo.getFileId());
       return new BlocksMovingAnalysis(
           BlocksMovingAnalysis.Status.BLOCKS_TARGET_PAIRING_SKIPPED,
           new ArrayList<>());
     }
     List<BlockMovingInfo> blockMovingInfos = new ArrayList<BlockMovingInfo>();
 
-    for (int i = 0; i < blocks.length; i++) {
-      BlockInfo blockInfo = blocks[i];
+    for (int i = 0; i < blocks.size(); i++) {
+      LocatedBlock blockInfo = blocks.get(i);
       List<StorageType> expectedStorageTypes;
       if (blockInfo.isStriped()) {
         if (ErasureCodingPolicyManager
             .checkStoragePolicySuitableForECStripedMode(
-                existingStoragePolicyID)) {
+                existingStoragePolicy.getId())) {
           expectedStorageTypes = existingStoragePolicy
-              .chooseStorageTypes((short) blockInfo.getCapacity());
+              .chooseStorageTypes((short) blockInfo.getLocations().length);
         } else {
           // Currently we support only limited policies (HOT, COLD, ALLSSD)
           // for EC striped mode files. SPS will ignore to move the blocks if
@@ -415,22 +398,16 @@ public class StoragePolicySatisfier implements Runnable {
         }
       } else {
         expectedStorageTypes = existingStoragePolicy
-            .chooseStorageTypes(blockInfo.getReplication());
+            .chooseStorageTypes(fileInfo.getReplication());
       }
 
-      DatanodeStorageInfo[] storages = blockManager.getStorages(blockInfo);
-      StorageType[] storageTypes = new StorageType[storages.length];
-      for (int j = 0; j < storages.length; j++) {
-        DatanodeStorageInfo datanodeStorageInfo = storages[j];
-        StorageType storageType = datanodeStorageInfo.getStorageType();
-        storageTypes[j] = storageType;
-      }
-      List<StorageType> existing =
-          new LinkedList<StorageType>(Arrays.asList(storageTypes));
+      List<StorageType> existing = new LinkedList<StorageType>(
+          Arrays.asList(blockInfo.getStorageTypes()));
       if (!DFSUtil.removeOverlapBetweenStorageTypes(expectedStorageTypes,
           existing, true)) {
         boolean blocksPaired = computeBlockMovingInfos(blockMovingInfos,
-            blockInfo, expectedStorageTypes, existing, storages);
+            blockInfo, expectedStorageTypes, existing, 
blockInfo.getLocations(),
+            liveDns, ecPolicy);
         if (blocksPaired) {
           status = BlocksMovingAnalysis.Status.BLOCKS_TARGETS_PAIRED;
         } else {
@@ -439,7 +416,7 @@ public class StoragePolicySatisfier implements Runnable {
           status = BlocksMovingAnalysis.Status.NO_BLOCKS_TARGETS_PAIRED;
         }
       } else {
-        if (blockManager.hasLowRedundancyBlocks(blockCollection)) {
+        if (hasLowRedundancyBlocks) {
           status = BlocksMovingAnalysis.Status.FEW_LOW_REDUNDANCY_BLOCKS;
         }
       }
@@ -448,13 +425,15 @@ public class StoragePolicySatisfier implements Runnable {
     List<Block> assignedBlockIds = new ArrayList<Block>();
     for (BlockMovingInfo blkMovingInfo : blockMovingInfos) {
       // Check for at least one block storage movement has been chosen
-      if (blkMovingInfo.getTarget() != null) {
-        // assign block storage movement task to the target node
-        ((DatanodeDescriptor) blkMovingInfo.getTarget())
-            .addBlocksToMoveStorage(blkMovingInfo);
+      try {
+        ctxt.assignBlockMoveTaskToTargetNode(blkMovingInfo);
         LOG.debug("BlockMovingInfo: {}", blkMovingInfo);
         assignedBlockIds.add(blkMovingInfo.getBlock());
         blockCount++;
+      } catch (IOException e) {
+        LOG.warn("Exception while scheduling movement task", e);
+        // failed to move the block.
+        status = BlocksMovingAnalysis.Status.BLOCKS_FAILED_TO_MOVE;
       }
     }
     return new BlocksMovingAnalysis(status, assignedBlockIds);
@@ -481,29 +460,29 @@ public class StoragePolicySatisfier implements Runnable {
    *         satisfy the storage policy, true otherwise
    */
   private boolean computeBlockMovingInfos(
-      List<BlockMovingInfo> blockMovingInfos, BlockInfo blockInfo,
+      List<BlockMovingInfo> blockMovingInfos, LocatedBlock blockInfo,
       List<StorageType> expectedStorageTypes, List<StorageType> existing,
-      DatanodeStorageInfo[] storages) {
+      DatanodeInfo[] storages, DatanodeStorageReport[] liveDns,
+      ErasureCodingPolicy ecPolicy) {
     boolean foundMatchingTargetNodesForBlock = true;
     if (!DFSUtil.removeOverlapBetweenStorageTypes(expectedStorageTypes,
         existing, true)) {
       List<StorageTypeNodePair> sourceWithStorageMap =
           new ArrayList<StorageTypeNodePair>();
-      List<DatanodeStorageInfo> existingBlockStorages =
-          new ArrayList<DatanodeStorageInfo>(Arrays.asList(storages));
+      List<DatanodeInfo> existingBlockStorages = new ArrayList<DatanodeInfo>(
+          Arrays.asList(storages));
       // if expected type exists in source node already, local movement would 
be
       // possible, so lets find such sources first.
-      Iterator<DatanodeStorageInfo> iterator = 
existingBlockStorages.iterator();
+      Iterator<DatanodeInfo> iterator = existingBlockStorages.iterator();
       while (iterator.hasNext()) {
-        DatanodeStorageInfo datanodeStorageInfo = iterator.next();
-        if (checkSourceAndTargetTypeExists(
-            datanodeStorageInfo.getDatanodeDescriptor(), existing,
-            expectedStorageTypes)) {
+        DatanodeInfoWithStorage dnInfo = (DatanodeInfoWithStorage) iterator
+            .next();
+        if (checkSourceAndTargetTypeExists(dnInfo, existing,
+            expectedStorageTypes, liveDns)) {
           sourceWithStorageMap
-              .add(new 
StorageTypeNodePair(datanodeStorageInfo.getStorageType(),
-                  datanodeStorageInfo.getDatanodeDescriptor()));
+              .add(new StorageTypeNodePair(dnInfo.getStorageType(), dnInfo));
           iterator.remove();
-          existing.remove(datanodeStorageInfo.getStorageType());
+          existing.remove(dnInfo.getStorageType());
         }
       }
 
@@ -511,23 +490,25 @@ public class StoragePolicySatisfier implements Runnable {
       for (StorageType existingType : existing) {
         iterator = existingBlockStorages.iterator();
         while (iterator.hasNext()) {
-          DatanodeStorageInfo datanodeStorageInfo = iterator.next();
-          StorageType storageType = datanodeStorageInfo.getStorageType();
+          DatanodeInfoWithStorage dnStorageInfo =
+              (DatanodeInfoWithStorage) iterator.next();
+          StorageType storageType = dnStorageInfo.getStorageType();
           if (storageType == existingType) {
             iterator.remove();
             sourceWithStorageMap.add(new StorageTypeNodePair(storageType,
-                datanodeStorageInfo.getDatanodeDescriptor()));
+                dnStorageInfo));
             break;
           }
         }
       }
 
       StorageTypeNodeMap locsForExpectedStorageTypes =
-          findTargetsForExpectedStorageTypes(expectedStorageTypes);
+          findTargetsForExpectedStorageTypes(expectedStorageTypes, liveDns);
 
       foundMatchingTargetNodesForBlock |= findSourceAndTargetToMove(
           blockMovingInfos, blockInfo, sourceWithStorageMap,
-          expectedStorageTypes, locsForExpectedStorageTypes);
+          expectedStorageTypes, locsForExpectedStorageTypes,
+          ecPolicy);
     }
     return foundMatchingTargetNodesForBlock;
   }
@@ -550,12 +531,13 @@ public class StoragePolicySatisfier implements Runnable {
    *         satisfy the storage policy
    */
   private boolean findSourceAndTargetToMove(
-      List<BlockMovingInfo> blockMovingInfos, BlockInfo blockInfo,
+      List<BlockMovingInfo> blockMovingInfos, LocatedBlock blockInfo,
       List<StorageTypeNodePair> sourceWithStorageList,
       List<StorageType> expected,
-      StorageTypeNodeMap locsForExpectedStorageTypes) {
+      StorageTypeNodeMap locsForExpectedStorageTypes,
+      ErasureCodingPolicy ecPolicy) {
     boolean foundMatchingTargetNodesForBlock = true;
-    List<DatanodeDescriptor> excludeNodes = new ArrayList<>();
+    List<DatanodeInfo> excludeNodes = new ArrayList<>();
 
     // Looping over all the source node locations and choose the target
     // storage within same node if possible. This is done separately to
@@ -566,13 +548,14 @@ public class StoragePolicySatisfier implements Runnable {
       // Check whether the block replica is already placed in the expected
       // storage type in this source datanode.
       if (!expected.contains(existingTypeNodePair.storageType)) {
-        StorageTypeNodePair chosenTarget = chooseTargetTypeInSameNode(
-            blockInfo, existingTypeNodePair.dn, expected);
+        StorageTypeNodePair chosenTarget = 
chooseTargetTypeInSameNode(blockInfo,
+            existingTypeNodePair.dn, expected);
         if (chosenTarget != null) {
           if (blockInfo.isStriped()) {
             buildStripedBlockMovingInfos(blockInfo, existingTypeNodePair.dn,
                 existingTypeNodePair.storageType, chosenTarget.dn,
-                chosenTarget.storageType, blockMovingInfos);
+                chosenTarget.storageType, blockMovingInfos,
+                ecPolicy);
           } else {
             buildContinuousBlockMovingInfos(blockInfo, existingTypeNodePair.dn,
                 existingTypeNodePair.storageType, chosenTarget.dn,
@@ -596,7 +579,7 @@ public class StoragePolicySatisfier implements Runnable {
       if (checkIfAlreadyChosen(blockMovingInfos, existingTypeNodePair.dn)) {
         continue;
       }
-      if (chosenTarget == null && blockManager.getDatanodeManager()
+      if (chosenTarget == null && ctxt
           .getNetworkTopology().isNodeGroupAware()) {
         chosenTarget = chooseTarget(blockInfo, existingTypeNodePair.dn,
             expected, Matcher.SAME_NODE_GROUP, locsForExpectedStorageTypes,
@@ -619,7 +602,7 @@ public class StoragePolicySatisfier implements Runnable {
         if (blockInfo.isStriped()) {
           buildStripedBlockMovingInfos(blockInfo, existingTypeNodePair.dn,
               existingTypeNodePair.storageType, chosenTarget.dn,
-              chosenTarget.storageType, blockMovingInfos);
+              chosenTarget.storageType, blockMovingInfos, ecPolicy);
         } else {
           buildContinuousBlockMovingInfos(blockInfo, existingTypeNodePair.dn,
               existingTypeNodePair.storageType, chosenTarget.dn,
@@ -645,7 +628,7 @@ public class StoragePolicySatisfier implements Runnable {
   }
 
   private boolean checkIfAlreadyChosen(List<BlockMovingInfo> blockMovingInfos,
-      DatanodeDescriptor dn) {
+      DatanodeInfo dn) {
     for (BlockMovingInfo blockMovingInfo : blockMovingInfos) {
       if (blockMovingInfo.getSource().equals(dn)) {
         return true;
@@ -654,37 +637,40 @@ public class StoragePolicySatisfier implements Runnable {
     return false;
   }
 
-  private void buildContinuousBlockMovingInfos(BlockInfo blockInfo,
+  private void buildContinuousBlockMovingInfos(LocatedBlock blockInfo,
       DatanodeInfo sourceNode, StorageType sourceStorageType,
       DatanodeInfo targetNode, StorageType targetStorageType,
       List<BlockMovingInfo> blkMovingInfos) {
-    Block blk = new Block(blockInfo.getBlockId(), blockInfo.getNumBytes(),
-        blockInfo.getGenerationStamp());
+    Block blk = ExtendedBlock.getLocalBlock(blockInfo.getBlock());
     BlockMovingInfo blkMovingInfo = new BlockMovingInfo(blk, sourceNode,
         targetNode, sourceStorageType, targetStorageType);
     blkMovingInfos.add(blkMovingInfo);
   }
 
-  private void buildStripedBlockMovingInfos(BlockInfo blockInfo,
+  private void buildStripedBlockMovingInfos(LocatedBlock blockInfo,
       DatanodeInfo sourceNode, StorageType sourceStorageType,
       DatanodeInfo targetNode, StorageType targetStorageType,
-      List<BlockMovingInfo> blkMovingInfos) {
+      List<BlockMovingInfo> blkMovingInfos, ErasureCodingPolicy ecPolicy) {
     // For a striped block, it needs to construct internal block at the given
     // index of a block group. Here it is iterating over all the block indices
     // and construct internal blocks which can be then considered for block
     // movement.
-    BlockInfoStriped sBlockInfo = (BlockInfoStriped) blockInfo;
-    for (StorageAndBlockIndex si : sBlockInfo.getStorageAndIndexInfos()) {
-      if (si.getBlockIndex() >= 0) {
-        DatanodeDescriptor dn = si.getStorage().getDatanodeDescriptor();
-        if (sourceNode.equals(dn)) {
+    LocatedStripedBlock sBlockInfo = (LocatedStripedBlock) blockInfo;
+    byte[] indices = sBlockInfo.getBlockIndices();
+    DatanodeInfo[] locations = sBlockInfo.getLocations();
+    for (int i = 0; i < indices.length; i++) {
+      byte blkIndex = indices[i];
+      if (blkIndex >= 0) {
+        // pick block movement only for the given source node.
+        if (sourceNode.equals(locations[i])) {
           // construct internal block
-          long blockId = blockInfo.getBlockId() + si.getBlockIndex();
+          ExtendedBlock extBlock = sBlockInfo.getBlock();
           long numBytes = StripedBlockUtil.getInternalBlockLength(
-              sBlockInfo.getNumBytes(), sBlockInfo.getCellSize(),
-              sBlockInfo.getDataBlockNum(), si.getBlockIndex());
-          Block blk = new Block(blockId, numBytes,
-              blockInfo.getGenerationStamp());
+              extBlock.getNumBytes(), ecPolicy, blkIndex);
+          Block blk = new Block(ExtendedBlock.getLocalBlock(extBlock));
+          long blkId = blk.getBlockId() + blkIndex;
+          blk.setBlockId(blkId);
+          blk.setNumBytes(numBytes);
           BlockMovingInfo blkMovingInfo = new BlockMovingInfo(blk, sourceNode,
               targetNode, sourceStorageType, targetStorageType);
           blkMovingInfos.add(blkMovingInfo);
@@ -703,34 +689,35 @@ public class StoragePolicySatisfier implements Runnable {
    * @param targetTypes
    *          - list of target storage types
    */
-  private StorageTypeNodePair chooseTargetTypeInSameNode(Block block,
-      DatanodeDescriptor source, List<StorageType> targetTypes) {
+  private StorageTypeNodePair chooseTargetTypeInSameNode(LocatedBlock 
blockInfo,
+      DatanodeInfo source, List<StorageType> targetTypes) {
     for (StorageType t : targetTypes) {
-      DatanodeStorageInfo chooseStorage4Block =
-          source.chooseStorage4Block(t, block.getNumBytes());
-      if (chooseStorage4Block != null) {
+      boolean goodTargetDn = ctxt.verifyTargetDatanodeHasSpaceForScheduling(
+          source, t, blockInfo.getBlockSize());
+      if (goodTargetDn) {
         return new StorageTypeNodePair(t, source);
       }
     }
     return null;
   }
 
-  private StorageTypeNodePair chooseTarget(Block block,
-      DatanodeDescriptor source, List<StorageType> targetTypes, Matcher 
matcher,
+  private StorageTypeNodePair chooseTarget(LocatedBlock block,
+      DatanodeInfo source, List<StorageType> targetTypes, Matcher matcher,
       StorageTypeNodeMap locsForExpectedStorageTypes,
-      List<DatanodeDescriptor> excludeNodes) {
+      List<DatanodeInfo> excludeNodes) {
     for (StorageType t : targetTypes) {
-      List<DatanodeDescriptor> nodesWithStorages =
-          locsForExpectedStorageTypes.getNodesWithStorages(t);
+      List<DatanodeInfo> nodesWithStorages = locsForExpectedStorageTypes
+          .getNodesWithStorages(t);
       if (nodesWithStorages == null || nodesWithStorages.isEmpty()) {
         continue; // no target nodes with the required storage type.
       }
       Collections.shuffle(nodesWithStorages);
-      for (DatanodeDescriptor target : nodesWithStorages) {
-        if (!excludeNodes.contains(target) && matcher.match(
-            blockManager.getDatanodeManager().getNetworkTopology(), source,
-            target)) {
-          if (null != target.chooseStorage4Block(t, block.getNumBytes())) {
+      for (DatanodeInfo target : nodesWithStorages) {
+        if (!excludeNodes.contains(target)
+            && matcher.match(ctxt.getNetworkTopology(), source, target)) {
+          boolean goodTargetDn = 
ctxt.verifyTargetDatanodeHasSpaceForScheduling(
+              target, t, block.getBlockSize());
+          if (goodTargetDn) {
             return new StorageTypeNodePair(t, target);
           }
         }
@@ -741,27 +728,25 @@ public class StoragePolicySatisfier implements Runnable {
 
   private static class StorageTypeNodePair {
     private StorageType storageType = null;
-    private DatanodeDescriptor dn = null;
+    private DatanodeInfo dn = null;
 
-    StorageTypeNodePair(StorageType storageType, DatanodeDescriptor dn) {
+    StorageTypeNodePair(StorageType storageType, DatanodeInfo dn) {
       this.storageType = storageType;
       this.dn = dn;
     }
   }
 
   private StorageTypeNodeMap findTargetsForExpectedStorageTypes(
-      List<StorageType> expected) {
+      List<StorageType> expected, DatanodeStorageReport[] liveDns) {
     StorageTypeNodeMap targetMap = new StorageTypeNodeMap();
-    List<DatanodeDescriptor> reports = blockManager.getDatanodeManager()
-        .getDatanodeListForReport(DatanodeReportType.LIVE);
-    for (DatanodeDescriptor dn : reports) {
+    for (DatanodeStorageReport dn : liveDns) {
       StorageReport[] storageReports = dn.getStorageReports();
       for (StorageReport storageReport : storageReports) {
         StorageType t = storageReport.getStorage().getStorageType();
         if (expected.contains(t)) {
           final long maxRemaining = getMaxRemaining(dn.getStorageReports(), t);
           if (maxRemaining > 0L) {
-            targetMap.add(t, dn);
+            targetMap.add(t, dn.getDatanodeInfo());
           }
         }
       }
@@ -782,32 +767,40 @@ public class StoragePolicySatisfier implements Runnable {
     return max;
   }
 
-  private boolean checkSourceAndTargetTypeExists(DatanodeDescriptor dn,
-      List<StorageType> existing, List<StorageType> expectedStorageTypes) {
-    DatanodeStorageInfo[] allDNStorageInfos = dn.getStorageInfos();
+  private boolean checkSourceAndTargetTypeExists(DatanodeInfo dn,
+      List<StorageType> existing, List<StorageType> expectedStorageTypes,
+      DatanodeStorageReport[] liveDns) {
     boolean isExpectedTypeAvailable = false;
     boolean isExistingTypeAvailable = false;
-    for (DatanodeStorageInfo dnInfo : allDNStorageInfos) {
-      StorageType storageType = dnInfo.getStorageType();
-      if (existing.contains(storageType)) {
-        isExistingTypeAvailable = true;
-      }
-      if (expectedStorageTypes.contains(storageType)) {
-        isExpectedTypeAvailable = true;
+    for (DatanodeStorageReport liveDn : liveDns) {
+      if (dn.equals(liveDn.getDatanodeInfo())) {
+        StorageReport[] storageReports = liveDn.getStorageReports();
+        for (StorageReport eachStorage : storageReports) {
+          StorageType storageType = eachStorage.getStorage().getStorageType();
+          if (existing.contains(storageType)) {
+            isExistingTypeAvailable = true;
+          }
+          if (expectedStorageTypes.contains(storageType)) {
+            isExpectedTypeAvailable = true;
+          }
+          if (isExistingTypeAvailable && isExpectedTypeAvailable) {
+            return true;
+          }
+        }
       }
     }
     return isExistingTypeAvailable && isExpectedTypeAvailable;
   }
 
   private static class StorageTypeNodeMap {
-    private final EnumMap<StorageType, List<DatanodeDescriptor>> typeNodeMap =
-        new EnumMap<StorageType, List<DatanodeDescriptor>>(StorageType.class);
+    private final EnumMap<StorageType, List<DatanodeInfo>> typeNodeMap =
+        new EnumMap<StorageType, List<DatanodeInfo>>(StorageType.class);
 
-    private void add(StorageType t, DatanodeDescriptor dn) {
-      List<DatanodeDescriptor> nodesWithStorages = getNodesWithStorages(t);
-      LinkedList<DatanodeDescriptor> value = null;
+    private void add(StorageType t, DatanodeInfo dn) {
+      List<DatanodeInfo> nodesWithStorages = getNodesWithStorages(t);
+      LinkedList<DatanodeInfo> value = null;
       if (nodesWithStorages == null) {
-        value = new LinkedList<DatanodeDescriptor>();
+        value = new LinkedList<DatanodeInfo>();
         value.add(dn);
         typeNodeMap.put(t, value);
       } else {
@@ -820,7 +813,7 @@ public class StoragePolicySatisfier implements Runnable {
      *          - Storage type
      * @return datanodes which has the given storage type
      */
-    private List<DatanodeDescriptor> getNodesWithStorages(StorageType type) {
+    private List<DatanodeInfo> getNodesWithStorages(StorageType type) {
       return typeNodeMap.get(type);
     }
   }
@@ -982,7 +975,6 @@ public class StoragePolicySatisfier implements Runnable {
 
   public StoragePolicySatisfyPathStatus checkStoragePolicySatisfyPathStatus(
       String path) throws IOException {
-    INode inode = namesystem.getFSDirectory().getINode(path);
-    return storageMovementNeeded.getStatus(inode.getId());
+    return storageMovementNeeded.getStatus(ctxt.getFileID(path));
   }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/113185ee/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/sps/TestBlockStorageMovementAttemptedItems.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/sps/TestBlockStorageMovementAttemptedItems.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/sps/TestBlockStorageMovementAttemptedItems.java
index 62766d9..f9762a8 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/sps/TestBlockStorageMovementAttemptedItems.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/sps/TestBlockStorageMovementAttemptedItems.java
@@ -25,8 +25,9 @@ import static org.junit.Assert.assertTrue;
 import java.util.ArrayList;
 import java.util.List;
 
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hdfs.HdfsConfiguration;
 import org.apache.hadoop.hdfs.protocol.Block;
-import org.apache.hadoop.hdfs.server.namenode.Namesystem;
 import 
org.apache.hadoop.hdfs.server.namenode.sps.StoragePolicySatisfier.AttemptedItemInfo;
 import 
org.apache.hadoop.hdfs.server.namenode.sps.StoragePolicySatisfier.ItemInfo;
 import org.junit.After;
@@ -46,11 +47,15 @@ public class TestBlockStorageMovementAttemptedItems {
 
   @Before
   public void setup() throws Exception {
-    unsatisfiedStorageMovementFiles = new BlockStorageMovementNeeded(
-        Mockito.mock(Namesystem.class),
-        Mockito.mock(StoragePolicySatisfier.class), 100);
-    bsmAttemptedItems = new BlockStorageMovementAttemptedItems(100,
-        selfRetryTimeout, unsatisfiedStorageMovementFiles);
+    Configuration config = new HdfsConfiguration();
+    Context ctxt = Mockito.mock(Context.class);
+    Mockito.when(ctxt.getConf()).thenReturn(config);
+    Mockito.when(ctxt.isRunning()).thenReturn(true);
+    Mockito.when(ctxt.isInSafeMode()).thenReturn(false);
+    Mockito.when(ctxt.isFileExist(Mockito.anyLong())).thenReturn(true);
+    unsatisfiedStorageMovementFiles = new BlockStorageMovementNeeded(ctxt);
+    bsmAttemptedItems = new BlockStorageMovementAttemptedItems(ctxt,
+        unsatisfiedStorageMovementFiles);
   }
 
   @After

http://git-wip-us.apache.org/repos/asf/hadoop/blob/113185ee/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/sps/TestStoragePolicySatisfier.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/sps/TestStoragePolicySatisfier.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/sps/TestStoragePolicySatisfier.java
index 8dc52dc..2a7bde5 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/sps/TestStoragePolicySatisfier.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/sps/TestStoragePolicySatisfier.java
@@ -18,6 +18,7 @@
 package org.apache.hadoop.hdfs.server.namenode.sps;
 
 import static 
org.apache.hadoop.hdfs.DFSConfigKeys.DFS_STORAGE_POLICY_ENABLED_KEY;
+import static 
org.apache.hadoop.hdfs.DFSConfigKeys.DFS_STORAGE_POLICY_SATISFIER_QUEUE_LIMIT_KEY;
 import static 
org.apache.hadoop.hdfs.server.common.HdfsServerConstants.XATTR_SATISFY_STORAGE_POLICY;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertNull;
@@ -68,6 +69,7 @@ import org.apache.hadoop.hdfs.server.namenode.FSTreeTraverser;
 import org.apache.hadoop.hdfs.server.namenode.INode;
 import org.apache.hadoop.test.GenericTestUtils;
 import org.apache.hadoop.test.GenericTestUtils.LogCapturer;
+import org.junit.After;
 import org.junit.Assert;
 import org.junit.Test;
 import org.mockito.Mockito;
@@ -105,7 +107,8 @@ public class TestStoragePolicySatisfier {
   private DistributedFileSystem dfs = null;
   private static final int DEFAULT_BLOCK_SIZE = 1024;
 
-  private void shutdownCluster() {
+  @After
+  public void shutdownCluster() {
     if (hdfsCluster != null) {
       hdfsCluster.shutdown();
     }
@@ -1298,11 +1301,17 @@ public class TestStoragePolicySatisfier {
     //entry in queue. After 10 files, traverse control will be on U.
     StoragePolicySatisfier sps = Mockito.mock(StoragePolicySatisfier.class);
     Mockito.when(sps.isRunning()).thenReturn(true);
+    Context ctxt = Mockito.mock(Context.class);
+    config.setInt(DFS_STORAGE_POLICY_SATISFIER_QUEUE_LIMIT_KEY, 10);
+    Mockito.when(ctxt.getConf()).thenReturn(config);
+    Mockito.when(ctxt.isRunning()).thenReturn(true);
+    Mockito.when(ctxt.isInSafeMode()).thenReturn(false);
+    Mockito.when(ctxt.isFileExist(Mockito.anyLong())).thenReturn(true);
     BlockStorageMovementNeeded movmentNeededQueue =
-        new BlockStorageMovementNeeded(hdfsCluster.getNamesystem(), sps, 10);
+        new BlockStorageMovementNeeded(ctxt);
     INode rootINode = fsDir.getINode("/root");
     movmentNeededQueue.addToPendingDirQueue(rootINode.getId());
-    movmentNeededQueue.init();
+    movmentNeededQueue.init(fsDir);
 
     //Wait for thread to reach U.
     Thread.sleep(1000);
@@ -1361,9 +1370,15 @@ public class TestStoragePolicySatisfier {
     Mockito.when(sps.isRunning()).thenReturn(true);
     // Queue limit can control the traverse logic to wait for some free
     // entry in queue. After 10 files, traverse control will be on U.
+    Context ctxt = Mockito.mock(Context.class);
+    config.setInt(DFS_STORAGE_POLICY_SATISFIER_QUEUE_LIMIT_KEY, 10);
+    Mockito.when(ctxt.getConf()).thenReturn(config);
+    Mockito.when(ctxt.isRunning()).thenReturn(true);
+    Mockito.when(ctxt.isInSafeMode()).thenReturn(false);
+    Mockito.when(ctxt.isFileExist(Mockito.anyLong())).thenReturn(true);
     BlockStorageMovementNeeded movmentNeededQueue =
-        new BlockStorageMovementNeeded(hdfsCluster.getNamesystem(), sps, 10);
-    movmentNeededQueue.init();
+        new BlockStorageMovementNeeded(ctxt);
+    movmentNeededQueue.init(fsDir);
     INode rootINode = fsDir.getINode("/root");
     movmentNeededQueue.addToPendingDirQueue(rootINode.getId());
     // Wait for thread to reach U.


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org
For additional commands, e-mail: common-commits-h...@hadoop.apache.org

Reply via email to