HDFS-13381 : [SPS]: Use DFSUtilClient#makePathFromFileId() to prepare satisfier 
file path. Contributed by Rakesh R.


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/8eabfc06
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/8eabfc06
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/8eabfc06

Branch: refs/heads/HDFS-10285
Commit: 8eabfc0698304b691c51512aaa511be21a74d56f
Parents: 93a0454
Author: Uma Maheswara Rao G <uma.ganguma...@intel.com>
Authored: Mon Jul 2 17:22:00 2018 -0700
Committer: Rakesh Radhakrishnan <rake...@apache.org>
Committed: Tue Jul 31 12:11:01 2018 +0530

----------------------------------------------------------------------
 .../NamenodeProtocolServerSideTranslatorPB.java |  2 +-
 .../NamenodeProtocolTranslatorPB.java           |  2 +-
 .../server/blockmanagement/BlockManager.java    |  2 +-
 .../hdfs/server/namenode/FSNamesystem.java      | 11 ---
 .../hdfs/server/namenode/NameNodeRpcServer.java |  8 +-
 .../hadoop/hdfs/server/namenode/Namesystem.java |  9 ---
 .../sps/BlockStorageMovementAttemptedItems.java | 72 +++++++----------
 .../sps/BlockStorageMovementNeeded.java         | 61 ++++++--------
 .../hdfs/server/namenode/sps/Context.java       | 45 ++++++++---
 .../namenode/sps/DatanodeCacheManager.java      |  4 +-
 .../hdfs/server/namenode/sps/FileCollector.java | 13 +--
 .../namenode/sps/IntraSPSNameNodeContext.java   | 54 +++++++++----
 .../sps/IntraSPSNameNodeFileIdCollector.java    | 14 ++--
 .../hdfs/server/namenode/sps/ItemInfo.java      | 34 ++++----
 .../hdfs/server/namenode/sps/SPSService.java    | 31 +++----
 .../namenode/sps/StoragePolicySatisfier.java    | 61 +++++---------
 .../sps/StoragePolicySatisfyManager.java        | 20 ++---
 .../hdfs/server/protocol/NamenodeProtocol.java  |  2 +-
 .../sps/ExternalSPSBlockMoveTaskHandler.java    |  4 +-
 .../hdfs/server/sps/ExternalSPSContext.java     | 85 ++++++++++++++++----
 .../sps/ExternalSPSFilePathCollector.java       | 36 +++++----
 .../sps/ExternalStoragePolicySatisfier.java     | 30 +------
 .../src/main/proto/NamenodeProtocol.proto       |  2 +-
 .../TestBlockStorageMovementAttemptedItems.java | 16 ++--
 .../sps/TestStoragePolicySatisfier.java         | 66 +++++----------
 ...stStoragePolicySatisfierWithStripedFile.java | 41 ++++------
 .../sps/TestExternalStoragePolicySatisfier.java | 35 +++-----
 27 files changed, 346 insertions(+), 414 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/8eabfc06/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/NamenodeProtocolServerSideTranslatorPB.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/NamenodeProtocolServerSideTranslatorPB.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/NamenodeProtocolServerSideTranslatorPB.java
index e4283c6..d9367fb 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/NamenodeProtocolServerSideTranslatorPB.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/NamenodeProtocolServerSideTranslatorPB.java
@@ -265,7 +265,7 @@ public class NamenodeProtocolServerSideTranslatorPB 
implements
       RpcController controller, GetNextSPSPathRequestProto request)
           throws ServiceException {
     try {
-      String nextSPSPath = impl.getNextSPSPath();
+      Long nextSPSPath = impl.getNextSPSPath();
       if (nextSPSPath == null) {
         return GetNextSPSPathResponseProto.newBuilder().build();
       }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/8eabfc06/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/NamenodeProtocolTranslatorPB.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/NamenodeProtocolTranslatorPB.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/NamenodeProtocolTranslatorPB.java
index 97dee9b..3bd5986 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/NamenodeProtocolTranslatorPB.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/NamenodeProtocolTranslatorPB.java
@@ -267,7 +267,7 @@ public class NamenodeProtocolTranslatorPB implements 
NamenodeProtocol,
   }
 
   @Override
-  public String getNextSPSPath() throws IOException {
+  public Long getNextSPSPath() throws IOException {
     GetNextSPSPathRequestProto req =
         GetNextSPSPathRequestProto.newBuilder().build();
     try {

http://git-wip-us.apache.org/repos/asf/hadoop/blob/8eabfc06/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java
index caf250f..cb0de67 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java
@@ -3897,7 +3897,7 @@ public class BlockManager implements BlockStatsMXBean {
   private void notifyStorageMovementAttemptFinishedBlk(
       DatanodeStorageInfo storageInfo, Block block) {
     if (getSPSManager() != null) {
-      SPSService<Long> sps = getSPSManager().getInternalSPSService();
+      SPSService sps = getSPSManager().getInternalSPSService();
       if (sps.isRunning()) {
         sps.notifyStorageMovementAttemptFinishedBlk(
             storageInfo.getDatanodeDescriptor(), storageInfo.getStorageType(),

http://git-wip-us.apache.org/repos/asf/hadoop/blob/8eabfc06/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
index 9aa7a7e..ddb438d 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
@@ -3202,17 +3202,6 @@ public class FSNamesystem implements Namesystem, 
FSNamesystemMBean,
     return stat;
   }
 
-  @Override
-  public String getFilePath(Long inodeId) {
-    readLock();
-    try {
-      INode inode = getFSDirectory().getInode(inodeId);
-      return inode == null ? null : inode.getFullPathName();
-    } finally {
-      readUnlock();
-    }
-  }
-
   /**
    * Returns true if the file is closed
    */

http://git-wip-us.apache.org/repos/asf/hadoop/blob/8eabfc06/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java
index 57e827d..2f3325f 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java
@@ -2569,7 +2569,7 @@ public class NameNodeRpcServer implements 
NamenodeProtocols {
   }
 
   @Override
-  public String getNextSPSPath() throws IOException {
+  public Long getNextSPSPath() throws IOException {
     checkNNStartup();
     String operationName = "getNextSPSPath";
     namesystem.checkSuperuserPrivilege(operationName);
@@ -2589,10 +2589,6 @@ public class NameNodeRpcServer implements 
NamenodeProtocols {
       throw new IOException("SPS service mode is " + spsMode + ", so "
           + "external SPS service is not allowed to fetch the path Ids");
     }
-    Long pathId = spsMgr.getNextPathId();
-    if (pathId == null) {
-      return null;
-    }
-    return namesystem.getFilePath(pathId);
+    return namesystem.getBlockManager().getSPSManager().getNextPathId();
   }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/8eabfc06/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/Namesystem.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/Namesystem.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/Namesystem.java
index fc933b7..82af4d2 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/Namesystem.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/Namesystem.java
@@ -77,13 +77,4 @@ public interface Namesystem extends RwLock, SafeMode {
    */
   HdfsFileStatus getFileInfo(String filePath, boolean resolveLink,
       boolean needLocation) throws IOException;
-
-  /**
-   * Gets the file path corresponds to the given file id.
-   *
-   * @param inodeId
-   *          file id
-   * @return string file path
-   */
-  String getFilePath(Long inodeId);
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/8eabfc06/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/BlockStorageMovementAttemptedItems.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/BlockStorageMovementAttemptedItems.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/BlockStorageMovementAttemptedItems.java
index 5b25491..df4f0dd 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/BlockStorageMovementAttemptedItems.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/BlockStorageMovementAttemptedItems.java
@@ -52,13 +52,8 @@ import com.google.common.annotations.VisibleForTesting;
  * entries from tracking. If there is no DN reports about movement attempt
  * finished for a longer time period, then such items will retries 
automatically
  * after timeout. The default timeout would be 5 minutes.
- *
- * @param <T>
- *          is identifier of inode or full path name of inode. Internal sps 
will
- *          use the file inodeId for the block movement. External sps will use
- *          file string path representation for the block movement.
  */
-public class BlockStorageMovementAttemptedItems<T> {
+public class BlockStorageMovementAttemptedItems {
   private static final Logger LOG =
       LoggerFactory.getLogger(BlockStorageMovementAttemptedItems.class);
 
@@ -66,14 +61,14 @@ public class BlockStorageMovementAttemptedItems<T> {
    * A map holds the items which are already taken for blocks movements
    * processing and sent to DNs.
    */
-  private final List<AttemptedItemInfo<T>> storageMovementAttemptedItems;
+  private final List<AttemptedItemInfo> storageMovementAttemptedItems;
   private Map<Block, Set<StorageTypeNodePair>> scheduledBlkLocs;
   // Maintains separate Queue to keep the movement finished blocks. This Q
   // is used to update the storageMovementAttemptedItems list asynchronously.
   private final BlockingQueue<Block> movementFinishedBlocks;
   private volatile boolean monitorRunning = true;
   private Daemon timerThread = null;
-  private BlockMovementListener blkMovementListener;
+  private final Context context;
   //
   // It might take anywhere between 5 to 10 minutes before
   // a request is timed out.
@@ -85,12 +80,12 @@ public class BlockStorageMovementAttemptedItems<T> {
   // a request is timed out.
   //
   private long minCheckTimeout = 1 * 60 * 1000; // minimum value
-  private BlockStorageMovementNeeded<T> blockStorageMovementNeeded;
-  private final SPSService<T> service;
+  private BlockStorageMovementNeeded blockStorageMovementNeeded;
+  private final SPSService service;
 
-  public BlockStorageMovementAttemptedItems(SPSService<T> service,
-      BlockStorageMovementNeeded<T> unsatisfiedStorageMovementFiles,
-      BlockMovementListener blockMovementListener) {
+  public BlockStorageMovementAttemptedItems(SPSService service,
+      BlockStorageMovementNeeded unsatisfiedStorageMovementFiles,
+      Context context) {
     this.service = service;
     long recheckTimeout = this.service.getConf().getLong(
         DFS_STORAGE_POLICY_SATISFIER_RECHECK_TIMEOUT_MILLIS_KEY,
@@ -106,19 +101,27 @@ public class BlockStorageMovementAttemptedItems<T> {
     storageMovementAttemptedItems = new ArrayList<>();
     scheduledBlkLocs = new HashMap<>();
     movementFinishedBlocks = new LinkedBlockingQueue<>();
-    this.blkMovementListener = blockMovementListener;
+    this.context = context;
   }
 
   /**
    * Add item to block storage movement attempted items map which holds the
    * tracking/blockCollection id versus time stamp.
    *
-   * @param itemInfo
-   *          - tracking info
+   * @param startPathId
+   *          - start satisfier path identifier
+   * @param fileId
+   *          - file identifier
+   * @param monotonicNow
+   *          - time now
+   * @param assignedBlocks
+   *          - assigned blocks for block movement
+   * @param retryCount
+   *          - retry count
    */
-  public void add(T startPath, T file, long monotonicNow,
+  public void add(long startPathId, long fileId, long monotonicNow,
       Map<Block, Set<StorageTypeNodePair>> assignedBlocks, int retryCount) {
-    AttemptedItemInfo<T> itemInfo = new AttemptedItemInfo<T>(startPath, file,
+    AttemptedItemInfo itemInfo = new AttemptedItemInfo(startPathId, fileId,
         monotonicNow, assignedBlocks.keySet(), retryCount);
     synchronized (storageMovementAttemptedItems) {
       storageMovementAttemptedItems.add(itemInfo);
@@ -161,11 +164,9 @@ public class BlockStorageMovementAttemptedItems<T> {
       boolean foundType = dn.getStorageType().equals(type);
       if (foundDn && foundType) {
         blkLocs.remove(dn);
-        // listener if it is plugged-in
-        if (blkMovementListener != null) {
-          blkMovementListener
-              .notifyMovementTriedBlocks(new Block[] {reportedBlock});
-        }
+        Block[] mFinishedBlocks = new Block[1];
+        mFinishedBlocks[0] = reportedBlock;
+        context.notifyMovementTriedBlocks(mFinishedBlocks);
         // All the block locations has reported.
         if (blkLocs.size() <= 0) {
           movementFinishedBlocks.add(reportedBlock);
@@ -244,15 +245,15 @@ public class BlockStorageMovementAttemptedItems<T> {
   @VisibleForTesting
   void blocksStorageMovementUnReportedItemsCheck() {
     synchronized (storageMovementAttemptedItems) {
-      Iterator<AttemptedItemInfo<T>> iter = storageMovementAttemptedItems
+      Iterator<AttemptedItemInfo> iter = storageMovementAttemptedItems
           .iterator();
       long now = monotonicNow();
       while (iter.hasNext()) {
-        AttemptedItemInfo<T> itemInfo = iter.next();
+        AttemptedItemInfo itemInfo = iter.next();
         if (now > itemInfo.getLastAttemptedOrReportedTime()
             + selfRetryTimeout) {
-          T file = itemInfo.getFile();
-          ItemInfo<T> candidate = new ItemInfo<T>(itemInfo.getStartPath(), 
file,
+          long file = itemInfo.getFile();
+          ItemInfo candidate = new ItemInfo(itemInfo.getStartPath(), file,
               itemInfo.getRetryCount() + 1);
           blockStorageMovementNeeded.add(candidate);
           iter.remove();
@@ -272,13 +273,13 @@ public class BlockStorageMovementAttemptedItems<T> {
     // Update attempted items list
     for (Block blk : finishedBlks) {
       synchronized (storageMovementAttemptedItems) {
-        Iterator<AttemptedItemInfo<T>> iterator = storageMovementAttemptedItems
+        Iterator<AttemptedItemInfo> iterator = storageMovementAttemptedItems
             .iterator();
         while (iterator.hasNext()) {
-          AttemptedItemInfo<T> attemptedItemInfo = iterator.next();
+          AttemptedItemInfo attemptedItemInfo = iterator.next();
           attemptedItemInfo.getBlocks().remove(blk);
           if (attemptedItemInfo.getBlocks().isEmpty()) {
-            blockStorageMovementNeeded.add(new ItemInfo<T>(
+            blockStorageMovementNeeded.add(new ItemInfo(
                 attemptedItemInfo.getStartPath(), attemptedItemInfo.getFile(),
                 attemptedItemInfo.getRetryCount() + 1));
             iterator.remove();
@@ -309,15 +310,4 @@ public class BlockStorageMovementAttemptedItems<T> {
       scheduledBlkLocs.clear();
     }
   }
-
-  /**
-   * Sets external listener for testing.
-   *
-   * @param blkMoveListener
-   *          block movement listener callback object
-   */
-  @VisibleForTesting
-  void setBlockMovementListener(BlockMovementListener blkMoveListener) {
-    this.blkMovementListener = blkMoveListener;
-  }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/8eabfc06/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/BlockStorageMovementNeeded.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/BlockStorageMovementNeeded.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/BlockStorageMovementNeeded.java
index a194876..c95dcda 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/BlockStorageMovementNeeded.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/BlockStorageMovementNeeded.java
@@ -43,47 +43,38 @@ import com.google.common.annotations.VisibleForTesting;
  * schedule the block collection IDs for movement. It track the info of
  * scheduled items and remove the SPS xAttr from the file/Directory once
  * movement is success.
- *
- * @param <T>
- *          is identifier of inode or full path name of inode. Internal sps 
will
- *          use the file inodeId for the block movement. External sps will use
- *          file string path representation for the block movement.
  */
 @InterfaceAudience.Private
-public class BlockStorageMovementNeeded<T> {
+public class BlockStorageMovementNeeded {
 
   public static final Logger LOG =
       LoggerFactory.getLogger(BlockStorageMovementNeeded.class);
 
-  private final Queue<ItemInfo<T>> storageMovementNeeded =
-      new LinkedList<ItemInfo<T>>();
+  private final Queue<ItemInfo> storageMovementNeeded =
+      new LinkedList<ItemInfo>();
 
   /**
    * Map of startPath and number of child's. Number of child's indicate the
    * number of files pending to satisfy the policy.
    */
-  private final Map<T, DirPendingWorkInfo> pendingWorkForDirectory =
+  private final Map<Long, DirPendingWorkInfo> pendingWorkForDirectory =
       new HashMap<>();
 
-  private final Map<T, StoragePolicySatisfyPathStatusInfo> spsStatus =
+  private final Map<Long, StoragePolicySatisfyPathStatusInfo> spsStatus =
       new ConcurrentHashMap<>();
 
-  private final Context<T> ctxt;
+  private final Context ctxt;
 
   private Daemon pathIdCollector;
 
-  private FileCollector<T> fileCollector;
-
   private SPSPathIdProcessor pathIDProcessor;
 
   // Amount of time to cache the SUCCESS status of path before turning it to
   // NOT_AVAILABLE.
   private static long statusClearanceElapsedTimeMs = 300000;
 
-  public BlockStorageMovementNeeded(Context<T> context,
-      FileCollector<T> fileCollector) {
+  public BlockStorageMovementNeeded(Context context) {
     this.ctxt = context;
-    this.fileCollector = fileCollector;
     pathIDProcessor = new SPSPathIdProcessor();
   }
 
@@ -94,7 +85,7 @@ public class BlockStorageMovementNeeded<T> {
    * @param trackInfo
    *          - track info for satisfy the policy
    */
-  public synchronized void add(ItemInfo<T> trackInfo) {
+  public synchronized void add(ItemInfo trackInfo) {
     spsStatus.put(trackInfo.getFile(),
         new StoragePolicySatisfyPathStatusInfo(
             StoragePolicySatisfyPathStatus.IN_PROGRESS));
@@ -114,7 +105,7 @@ public class BlockStorageMovementNeeded<T> {
    *          scan.
    */
   @VisibleForTesting
-  public synchronized void addAll(T startPath, List<ItemInfo<T>> itemInfoList,
+  public synchronized void addAll(long startPath, List<ItemInfo> itemInfoList,
       boolean scanCompleted) {
     storageMovementNeeded.addAll(itemInfoList);
     updatePendingDirScanStats(startPath, itemInfoList.size(), scanCompleted);
@@ -131,7 +122,7 @@ public class BlockStorageMovementNeeded<T> {
    *          elements to scan.
    */
   @VisibleForTesting
-  public synchronized void add(ItemInfo<T> itemInfo, boolean scanCompleted) {
+  public synchronized void add(ItemInfo itemInfo, boolean scanCompleted) {
     storageMovementNeeded.add(itemInfo);
     // This represents sps start id is file, so no need to update pending dir
     // stats.
@@ -141,7 +132,7 @@ public class BlockStorageMovementNeeded<T> {
     updatePendingDirScanStats(itemInfo.getFile(), 1, scanCompleted);
   }
 
-  private void updatePendingDirScanStats(T startPath, int numScannedFiles,
+  private void updatePendingDirScanStats(long startPath, int numScannedFiles,
       boolean scanCompleted) {
     DirPendingWorkInfo pendingWork = pendingWorkForDirectory.get(startPath);
     if (pendingWork == null) {
@@ -160,7 +151,7 @@ public class BlockStorageMovementNeeded<T> {
    *
    * @return satisfier files
    */
-  public synchronized ItemInfo<T> get() {
+  public synchronized ItemInfo get() {
     return storageMovementNeeded.poll();
   }
 
@@ -181,12 +172,12 @@ public class BlockStorageMovementNeeded<T> {
    * Decrease the pending child count for directory once one file blocks moved
    * successfully. Remove the SPS xAttr if pending child count is zero.
    */
-  public synchronized void removeItemTrackInfo(ItemInfo<T> trackInfo,
+  public synchronized void removeItemTrackInfo(ItemInfo trackInfo,
       boolean isSuccess) throws IOException {
     if (trackInfo.isDir()) {
       // If track is part of some start inode then reduce the pending
       // directory work count.
-      T startId = trackInfo.getStartPath();
+      long startId = trackInfo.getStartPath();
       if (!ctxt.isFileExist(startId)) {
         // directory deleted just remove it.
         this.pendingWorkForDirectory.remove(startId);
@@ -212,11 +203,11 @@ public class BlockStorageMovementNeeded<T> {
     }
   }
 
-  public synchronized void clearQueue(T trackId) {
+  public synchronized void clearQueue(long trackId) {
     ctxt.removeSPSPathId(trackId);
-    Iterator<ItemInfo<T>> iterator = storageMovementNeeded.iterator();
+    Iterator<ItemInfo> iterator = storageMovementNeeded.iterator();
     while (iterator.hasNext()) {
-      ItemInfo<T> next = iterator.next();
+      ItemInfo next = iterator.next();
       if (next.getFile() == trackId) {
         iterator.remove();
       }
@@ -227,7 +218,7 @@ public class BlockStorageMovementNeeded<T> {
   /**
    * Mark inode status as SUCCESS in map.
    */
-  private void updateStatus(T startId, boolean isSuccess){
+  private void updateStatus(long startId, boolean isSuccess){
     StoragePolicySatisfyPathStatusInfo spsStatusInfo =
         spsStatus.get(startId);
     if (spsStatusInfo == null) {
@@ -249,7 +240,7 @@ public class BlockStorageMovementNeeded<T> {
    */
   public synchronized void clearQueuesWithNotification() {
     // Remove xAttr from directories
-    T trackId;
+    Long trackId;
     while ((trackId = ctxt.getNextSPSPath()) != null) {
       try {
         // Remove xAttr for file
@@ -261,7 +252,7 @@ public class BlockStorageMovementNeeded<T> {
 
     // File's directly added to storageMovementNeeded, So try to remove
     // xAttr for file
-    ItemInfo<T> itemInfo;
+    ItemInfo itemInfo;
     while ((itemInfo = get()) != null) {
       try {
         // Remove xAttr for file
@@ -287,7 +278,7 @@ public class BlockStorageMovementNeeded<T> {
     public void run() {
       LOG.info("Starting SPSPathIdProcessor!.");
       long lastStatusCleanTime = 0;
-      T startINode = null;
+      Long startINode = null;
       while (ctxt.isRunning()) {
         try {
           if (!ctxt.isInSafeMode()) {
@@ -301,7 +292,7 @@ public class BlockStorageMovementNeeded<T> {
               spsStatus.put(startINode,
                   new StoragePolicySatisfyPathStatusInfo(
                       StoragePolicySatisfyPathStatus.IN_PROGRESS));
-              fileCollector.scanAndCollectFiles(startINode);
+              ctxt.scanAndCollectFiles(startINode);
               // check if directory was empty and no child added to queue
               DirPendingWorkInfo dirPendingWorkInfo =
                   pendingWorkForDirectory.get(startINode);
@@ -339,9 +330,9 @@ public class BlockStorageMovementNeeded<T> {
     }
 
     private synchronized void cleanSPSStatus() {
-      for (Iterator<Entry<T, StoragePolicySatisfyPathStatusInfo>> it = 
spsStatus
-          .entrySet().iterator(); it.hasNext();) {
-        Entry<T, StoragePolicySatisfyPathStatusInfo> entry = it.next();
+      for (Iterator<Entry<Long, StoragePolicySatisfyPathStatusInfo>> it =
+          spsStatus.entrySet().iterator(); it.hasNext();) {
+        Entry<Long, StoragePolicySatisfyPathStatusInfo> entry = it.next();
         if (entry.getValue().canRemove()) {
           it.remove();
         }
@@ -477,7 +468,7 @@ public class BlockStorageMovementNeeded<T> {
     return statusClearanceElapsedTimeMs;
   }
 
-  public void markScanCompletedForDir(T inode) {
+  public void markScanCompletedForDir(long inode) {
     DirPendingWorkInfo pendingWork = pendingWorkForDirectory.get(inode);
     if (pendingWork != null) {
       pendingWork.markScanCompleted();

http://git-wip-us.apache.org/repos/asf/hadoop/blob/8eabfc06/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/Context.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/Context.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/Context.java
index 55a1f7a..d538374 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/Context.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/Context.java
@@ -24,24 +24,21 @@ import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.classification.InterfaceStability;
 import org.apache.hadoop.fs.ParentNotDirectoryException;
 import org.apache.hadoop.fs.UnresolvedLinkException;
+import org.apache.hadoop.hdfs.protocol.Block;
 import org.apache.hadoop.hdfs.protocol.BlockStoragePolicy;
 import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
 import 
org.apache.hadoop.hdfs.server.namenode.sps.StoragePolicySatisfier.DatanodeMap;
 import org.apache.hadoop.hdfs.server.protocol.DatanodeStorageReport;
+import 
org.apache.hadoop.hdfs.server.protocol.BlockStorageMovementCommand.BlockMovingInfo;
 import org.apache.hadoop.net.NetworkTopology;
 import org.apache.hadoop.security.AccessControlException;
 
 /**
  * An interface for the communication between SPS and Namenode module.
- *
- * @param <T>
- *          is identifier of inode or full path name of inode. Internal sps 
will
- *          use the file inodeId for the block movement. External sps will use
- *          file string path representation for the block movement.
  */
 @InterfaceAudience.Private
 @InterfaceStability.Evolving
-public interface Context<T> {
+public interface Context {
 
   /**
    * Returns true if the SPS is running, false otherwise.
@@ -85,7 +82,7 @@ public interface Context<T> {
    *          - file info
    * @return true if the given file exists, false otherwise.
    */
-  boolean isFileExist(T filePath);
+  boolean isFileExist(long filePath);
 
   /**
    * Gets the storage policy details for the given policy ID.
@@ -108,7 +105,7 @@ public interface Context<T> {
    *          - user invoked satisfier path
    * @throws IOException
    */
-  void removeSPSHint(T spsPath) throws IOException;
+  void removeSPSHint(long spsPath) throws IOException;
 
   /**
    * Gets the number of live datanodes in the cluster.
@@ -124,7 +121,7 @@ public interface Context<T> {
    *          file path
    * @return file status metadata information
    */
-  HdfsFileStatus getFileInfo(T file) throws IOException;
+  HdfsFileStatus getFileInfo(long file) throws IOException;
 
   /**
    * Returns all the live datanodes and its storage details.
@@ -137,15 +134,41 @@ public interface Context<T> {
   /**
    * @return next SPS path info to process.
    */
-  T getNextSPSPath();
+  Long getNextSPSPath();
 
   /**
    * Removes the SPS path id.
    */
-  void removeSPSPathId(T pathId);
+  void removeSPSPathId(long pathId);
 
   /**
    * Removes all SPS path ids.
    */
   void removeAllSPSPathIds();
+
+  /**
+   * Do scan and collects the files under that directory and adds to the given
+   * BlockStorageMovementNeeded.
+   *
+   * @param filePath
+   *          file path
+   */
+  void scanAndCollectFiles(long filePath)
+      throws IOException, InterruptedException;
+
+  /**
+   * Handles the block move tasks. BlockMovingInfo must contain the required
+   * info to move the block, that source location, destination location and
+   * storage types.
+   */
+  void submitMoveTask(BlockMovingInfo blkMovingInfo) throws IOException;
+
+  /**
+   * This can be used to notify to the SPS about block movement attempt
+   * finished. Then SPS will re-check whether it needs retry or not.
+   *
+   * @param moveAttemptFinishedBlks
+   *          list of movement attempt finished blocks
+   */
+  void notifyMovementTriedBlocks(Block[] moveAttemptFinishedBlks);
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/8eabfc06/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/DatanodeCacheManager.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/DatanodeCacheManager.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/DatanodeCacheManager.java
index 3531ecd..d4e514b 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/DatanodeCacheManager.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/DatanodeCacheManager.java
@@ -43,7 +43,7 @@ import org.slf4j.LoggerFactory;
  * interval.
  */
 @InterfaceAudience.Private
-public class DatanodeCacheManager<T> {
+public class DatanodeCacheManager {
   private static final Logger LOG = LoggerFactory
       .getLogger(DatanodeCacheManager.class);
 
@@ -78,7 +78,7 @@ public class DatanodeCacheManager<T> {
    * @throws IOException
    */
   public DatanodeMap getLiveDatanodeStorageReport(
-      Context<T> spsContext) throws IOException {
+      Context spsContext) throws IOException {
     long now = Time.monotonicNow();
     long elapsedTimeMs = now - lastAccessedTime;
     boolean refreshNeeded = elapsedTimeMs >= refreshIntervalMs;

http://git-wip-us.apache.org/repos/asf/hadoop/blob/8eabfc06/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/FileCollector.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/FileCollector.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/FileCollector.java
index dceb5fa..fa8b31b 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/FileCollector.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/FileCollector.java
@@ -26,23 +26,18 @@ import org.apache.hadoop.classification.InterfaceStability;
 /**
  * An interface for scanning the directory recursively and collect files
  * under the given directory.
- *
- * @param <T>
- *          is identifier of inode or full path name of inode. Internal sps 
will
- *          use the file inodeId for the block movement. External sps will use
- *          file string path representation for the block movement.
  */
 @InterfaceAudience.Private
 @InterfaceStability.Evolving
-public interface FileCollector<T> {
+public interface FileCollector {
 
   /**
    * This method can be used to scan and collects the files under that
    * directory and adds to the given BlockStorageMovementNeeded.
    *
-   * @param filePath
-   *          - file path
+   * @param path
+   *          - file path id
    */
-  void scanAndCollectFiles(T filePath)
+  void scanAndCollectFiles(long path)
       throws IOException, InterruptedException;
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hadoop/blob/8eabfc06/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/IntraSPSNameNodeContext.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/IntraSPSNameNodeContext.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/IntraSPSNameNodeContext.java
index a77fe85..2bf4810 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/IntraSPSNameNodeContext.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/IntraSPSNameNodeContext.java
@@ -20,11 +20,14 @@ package org.apache.hadoop.hdfs.server.namenode.sps;
 import static 
org.apache.hadoop.hdfs.server.common.HdfsServerConstants.XATTR_SATISFY_STORAGE_POLICY;
 
 import java.io.IOException;
+import java.util.Arrays;
 
-import org.apache.commons.lang.StringUtils;
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.fs.ParentNotDirectoryException;
+import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.UnresolvedLinkException;
+import org.apache.hadoop.hdfs.DFSUtilClient;
+import org.apache.hadoop.hdfs.protocol.Block;
 import org.apache.hadoop.hdfs.protocol.BlockStoragePolicy;
 import org.apache.hadoop.hdfs.protocol.HdfsConstants.DatanodeReportType;
 import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
@@ -32,6 +35,7 @@ import 
org.apache.hadoop.hdfs.server.blockmanagement.BlockManager;
 import org.apache.hadoop.hdfs.server.common.HdfsServerConstants;
 import org.apache.hadoop.hdfs.server.namenode.INode;
 import org.apache.hadoop.hdfs.server.namenode.Namesystem;
+import 
org.apache.hadoop.hdfs.server.protocol.BlockStorageMovementCommand.BlockMovingInfo;
 import org.apache.hadoop.hdfs.server.protocol.DatanodeStorageReport;
 import 
org.apache.hadoop.hdfs.server.namenode.sps.StoragePolicySatisfier.DatanodeMap;
 import org.apache.hadoop.net.NetworkTopology;
@@ -45,20 +49,26 @@ import org.slf4j.LoggerFactory;
  * movements to satisfy the storage policy.
  */
 @InterfaceAudience.Private
-public class IntraSPSNameNodeContext implements Context<Long> {
+public class IntraSPSNameNodeContext implements Context {
   private static final Logger LOG = LoggerFactory
       .getLogger(IntraSPSNameNodeContext.class);
 
   private final Namesystem namesystem;
   private final BlockManager blockManager;
 
-  private SPSService<Long> service;
+  private SPSService service;
+  private final FileCollector fileCollector;
+  private final BlockMoveTaskHandler blockMoveTaskHandler;
 
   public IntraSPSNameNodeContext(Namesystem namesystem,
-      BlockManager blockManager, SPSService<Long> service) {
+      BlockManager blockManager, SPSService service) {
     this.namesystem = namesystem;
     this.blockManager = blockManager;
     this.service = service;
+    fileCollector = new IntraSPSNameNodeFileIdCollector(
+        namesystem.getFSDirectory(), service);
+    blockMoveTaskHandler = new IntraSPSNameNodeBlockMoveTaskHandler(
+        blockManager, namesystem);
   }
 
   @Override
@@ -67,17 +77,12 @@ public class IntraSPSNameNodeContext implements 
Context<Long> {
   }
 
   /**
-   * @return object containing information regarding the file or null if file
-   *         not found.
+   * @return object containing information regarding the file.
    */
   @Override
-  public HdfsFileStatus getFileInfo(Long inodeID) throws IOException {
-    String filePath = namesystem.getFilePath(inodeID);
-    if (StringUtils.isBlank(filePath)) {
-      LOG.debug("File with inodeID:{} doesn't exists!", inodeID);
-      return null;
-    }
-    return namesystem.getFileInfo(filePath, true, true);
+  public HdfsFileStatus getFileInfo(long inodeID) throws IOException {
+    Path filePath = DFSUtilClient.makePathFromFileId(inodeID);
+    return namesystem.getFileInfo(filePath.toString(), true, true);
   }
 
   @Override
@@ -93,12 +98,12 @@ public class IntraSPSNameNodeContext implements 
Context<Long> {
   }
 
   @Override
-  public boolean isFileExist(Long inodeId) {
+  public boolean isFileExist(long inodeId) {
     return namesystem.getFSDirectory().getInode(inodeId) != null;
   }
 
   @Override
-  public void removeSPSHint(Long inodeId) throws IOException {
+  public void removeSPSHint(long inodeId) throws IOException {
     this.namesystem.removeXattr(inodeId, XATTR_SATISFY_STORAGE_POLICY);
   }
 
@@ -156,7 +161,7 @@ public class IntraSPSNameNodeContext implements 
Context<Long> {
   }
 
   @Override
-  public void removeSPSPathId(Long trackId) {
+  public void removeSPSPathId(long trackId) {
     blockManager.getSPSManager().removePathId(trackId);
   }
 
@@ -164,4 +169,21 @@ public class IntraSPSNameNodeContext implements 
Context<Long> {
   public void removeAllSPSPathIds() {
     blockManager.getSPSManager().removeAllPathIds();
   }
+
+  @Override
+  public void scanAndCollectFiles(long filePath)
+      throws IOException, InterruptedException {
+    fileCollector.scanAndCollectFiles(filePath);
+  }
+
+  @Override
+  public void submitMoveTask(BlockMovingInfo blkMovingInfo) throws IOException 
{
+    blockMoveTaskHandler.submitMoveTask(blkMovingInfo);
+  }
+
+  @Override
+  public void notifyMovementTriedBlocks(Block[] moveAttemptFinishedBlks) {
+    LOG.info("Movement attempted blocks: {}",
+        Arrays.asList(moveAttemptFinishedBlks));
+  }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/8eabfc06/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/IntraSPSNameNodeFileIdCollector.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/IntraSPSNameNodeFileIdCollector.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/IntraSPSNameNodeFileIdCollector.java
index 27d9e7d..ea3b96f 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/IntraSPSNameNodeFileIdCollector.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/IntraSPSNameNodeFileIdCollector.java
@@ -35,16 +35,16 @@ import org.apache.hadoop.hdfs.server.namenode.INode;
  */
 @InterfaceAudience.Private
 public class IntraSPSNameNodeFileIdCollector extends FSTreeTraverser
-    implements FileCollector<Long> {
+    implements FileCollector {
   private int maxQueueLimitToScan;
-  private final SPSService <Long> service;
+  private final SPSService service;
 
   private int remainingCapacity = 0;
 
-  private List<ItemInfo<Long>> currentBatch;
+  private List<ItemInfo> currentBatch;
 
   public IntraSPSNameNodeFileIdCollector(FSDirectory dir,
-      SPSService<Long> service) {
+      SPSService service) {
     super(dir);
     this.service = service;
     this.maxQueueLimitToScan = service.getConf().getInt(
@@ -64,7 +64,7 @@ public class IntraSPSNameNodeFileIdCollector extends 
FSTreeTraverser
       return false;
     }
     if (inode.isFile() && inode.asFile().numBlocks() != 0) {
-      currentBatch.add(new ItemInfo<Long>(
+      currentBatch.add(new ItemInfo(
           ((SPSTraverseInfo) traverseInfo).getStartId(), inode.getId()));
       remainingCapacity--;
     }
@@ -120,7 +120,7 @@ public class IntraSPSNameNodeFileIdCollector extends 
FSTreeTraverser
   }
 
   @Override
-  public void scanAndCollectFiles(final Long startINodeId)
+  public void scanAndCollectFiles(final long startINodeId)
       throws IOException, InterruptedException {
     FSDirectory fsd = getFSDirectory();
     INode startInode = fsd.getInode(startINodeId);
@@ -131,7 +131,7 @@ public class IntraSPSNameNodeFileIdCollector extends 
FSTreeTraverser
       }
       if (startInode.isFile()) {
         currentBatch
-            .add(new ItemInfo<Long>(startInode.getId(), startInode.getId()));
+            .add(new ItemInfo(startInode.getId(), startInode.getId()));
       } else {
         readLock();
         // NOTE: this lock will not be held for full directory scanning. It is

http://git-wip-us.apache.org/repos/asf/hadoop/blob/8eabfc06/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/ItemInfo.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/ItemInfo.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/ItemInfo.java
index bd8ab92..949e3fc 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/ItemInfo.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/ItemInfo.java
@@ -21,28 +21,26 @@ import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.classification.InterfaceStability;
 
 /**
- * ItemInfo is a file info object for which need to satisfy the policy. For
- * internal satisfier service, it uses inode id which is Long datatype. For the
- * external satisfier service, it uses the full string representation of the
- * path.
+ * ItemInfo is a file info object for which need to satisfy the policy.
  */
 @InterfaceAudience.Private
 @InterfaceStability.Evolving
-public class ItemInfo<T> {
-  private T startPath;
-  private T file;
+public class ItemInfo {
+  private long startPathId;
+  private long fileId;
   private int retryCount;
 
-  public ItemInfo(T startPath, T file) {
-    this.startPath = startPath;
-    this.file = file;
+  public ItemInfo(long startPathId, long fileId) {
+    this.startPathId = startPathId;
+    this.fileId = fileId;
     // set 0 when item is getting added first time in queue.
     this.retryCount = 0;
   }
 
-  public ItemInfo(final T startPath, final T file, final int retryCount) {
-    this.startPath = startPath;
-    this.file = file;
+  public ItemInfo(final long startPathId, final long fileId,
+      final int retryCount) {
+    this.startPathId = startPathId;
+    this.fileId = fileId;
     this.retryCount = retryCount;
   }
 
@@ -50,22 +48,22 @@ public class ItemInfo<T> {
    * Returns the start path of the current file. This indicates that SPS
    * was invoked on this path.
    */
-  public T getStartPath() {
-    return startPath;
+  public long getStartPath() {
+    return startPathId;
   }
 
   /**
    * Returns the file for which needs to satisfy the policy.
    */
-  public T getFile() {
-    return file;
+  public long getFile() {
+    return fileId;
   }
 
   /**
    * Returns true if the tracking path is a directory, false otherwise.
    */
   public boolean isDir() {
-    return !startPath.equals(file);
+    return !(startPathId == fileId);
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/hadoop/blob/8eabfc06/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/SPSService.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/SPSService.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/SPSService.java
index 5032377..86634d8 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/SPSService.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/SPSService.java
@@ -29,15 +29,10 @@ import 
org.apache.hadoop.hdfs.protocol.HdfsConstants.StoragePolicySatisfierMode;
 
 /**
  * An interface for SPSService, which exposes life cycle and processing APIs.
- *
- * @param <T>
- *          is identifier of inode or full path name of inode. Internal sps 
will
- *          use the file inodeId for the block movement. External sps will use
- *          file string path representation for the block movement.
  */
 @InterfaceAudience.Private
 @InterfaceStability.Evolving
-public interface SPSService<T> {
+public interface SPSService {
 
   /**
    * Initializes the helper services.
@@ -45,16 +40,8 @@ public interface SPSService<T> {
    * @param ctxt
    *          - context is an helper service to provide communication channel
    *          between NN and SPS
-   * @param fileCollector
-   *          - a helper service for scanning the files under a given directory
-   *          id
-   * @param handler
-   *          - a helper service for moving the blocks
-   * @param blkMovementListener
-   *          - listener to know about block movement attempt completion
    */
-  void init(Context<T> ctxt, FileCollector<T> fileCollector,
-      BlockMoveTaskHandler handler, BlockMovementListener blkMovementListener);
+  void init(Context ctxt);
 
   /**
    * Starts the SPS service. Make sure to initialize the helper services before
@@ -94,19 +81,19 @@ public interface SPSService<T> {
    * @param itemInfo
    *          file info object for which need to satisfy the policy
    */
-  void addFileToProcess(ItemInfo<T> itemInfo, boolean scanCompleted);
+  void addFileToProcess(ItemInfo itemInfo, boolean scanCompleted);
 
   /**
    * Adds all the Item information(file etc) to processing queue.
    *
-   * @param startPath
-   *          - directory/file, on which SPS was called.
+   * @param startPathId
+   *          - directoryId/fileId, on which SPS was called.
    * @param itemInfoList
    *          - list of item infos
    * @param scanCompleted
    *          - whether the scanning of directory fully done with itemInfoList
    */
-  void addAllFilesToProcess(T startPath, List<ItemInfo<T>> itemInfoList,
+  void addAllFilesToProcess(long startPathId, List<ItemInfo> itemInfoList,
       boolean scanCompleted);
 
   /**
@@ -117,7 +104,7 @@ public interface SPSService<T> {
   /**
    * Clear inodeId present in the processing queue.
    */
-  void clearQueue(T spsPath);
+  void clearQueue(long spsPath);
 
   /**
    * @return the configuration.
@@ -128,9 +115,9 @@ public interface SPSService<T> {
    * Marks the scanning of directory if finished.
    *
    * @param spsPath
-   *          - satisfier path
+   *          - satisfier path id
    */
-  void markScanCompletedForPath(T spsPath);
+  void markScanCompletedForPath(long spsPath);
 
   /**
    * Given node is reporting that it received a certain movement attempt

http://git-wip-us.apache.org/repos/asf/hadoop/blob/8eabfc06/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/StoragePolicySatisfier.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/StoragePolicySatisfier.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/StoragePolicySatisfier.java
index cbd6001..4af6c8f 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/StoragePolicySatisfier.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/StoragePolicySatisfier.java
@@ -78,20 +78,19 @@ import com.google.common.base.Preconditions;
  * physical block movements.
  */
 @InterfaceAudience.Private
-public class StoragePolicySatisfier<T> implements SPSService<T>, Runnable {
+public class StoragePolicySatisfier implements SPSService, Runnable {
   public static final Logger LOG =
       LoggerFactory.getLogger(StoragePolicySatisfier.class);
   private Daemon storagePolicySatisfierThread;
-  private BlockStorageMovementNeeded<T> storageMovementNeeded;
-  private BlockStorageMovementAttemptedItems<T> storageMovementsMonitor;
+  private BlockStorageMovementNeeded storageMovementNeeded;
+  private BlockStorageMovementAttemptedItems storageMovementsMonitor;
   private volatile boolean isRunning = false;
   private int spsWorkMultiplier;
   private long blockCount = 0L;
   private int blockMovementMaxRetry;
-  private Context<T> ctxt;
-  private BlockMoveTaskHandler blockMoveTaskHandler;
+  private Context ctxt;
   private final Configuration conf;
-  private DatanodeCacheManager<T> dnCacheMgr;
+  private DatanodeCacheManager dnCacheMgr;
 
   public StoragePolicySatisfier(Configuration conf) {
     this.conf = conf;
@@ -137,16 +136,11 @@ public class StoragePolicySatisfier<T> implements 
SPSService<T>, Runnable {
     }
   }
 
-  public void init(final Context<T> context,
-      final FileCollector<T> fileIDCollector,
-      final BlockMoveTaskHandler blockMovementTaskHandler,
-      final BlockMovementListener blockMovementListener) {
+  public void init(final Context context) {
     this.ctxt = context;
-    this.storageMovementNeeded = new BlockStorageMovementNeeded<T>(context,
-        fileIDCollector);
-    this.storageMovementsMonitor = new BlockStorageMovementAttemptedItems<T>(
-        this, storageMovementNeeded, blockMovementListener);
-    this.blockMoveTaskHandler = blockMovementTaskHandler;
+    this.storageMovementNeeded = new BlockStorageMovementNeeded(context);
+    this.storageMovementsMonitor = new BlockStorageMovementAttemptedItems(
+        this, storageMovementNeeded, context);
     this.spsWorkMultiplier = getSPSWorkMultiplier(getConf());
     this.blockMovementMaxRetry = getConf().getInt(
         DFSConfigKeys.DFS_STORAGE_POLICY_SATISFIER_MAX_RETRY_ATTEMPTS_KEY,
@@ -191,7 +185,7 @@ public class StoragePolicySatisfier<T> implements 
SPSService<T>, Runnable {
     storagePolicySatisfierThread.start();
     this.storageMovementsMonitor.start();
     this.storageMovementNeeded.activate();
-    dnCacheMgr = new DatanodeCacheManager<T>(conf);
+    dnCacheMgr = new DatanodeCacheManager(conf);
   }
 
   @Override
@@ -259,7 +253,7 @@ public class StoragePolicySatisfier<T> implements 
SPSService<T>, Runnable {
         continue;
       }
       try {
-        ItemInfo<T> itemInfo = null;
+        ItemInfo itemInfo = null;
         boolean retryItem = false;
         if (!ctxt.isInSafeMode()) {
           itemInfo = storageMovementNeeded.get();
@@ -271,7 +265,7 @@ public class StoragePolicySatisfier<T> implements 
SPSService<T>, Runnable {
               storageMovementNeeded.removeItemTrackInfo(itemInfo, false);
               continue;
             }
-            T trackId = itemInfo.getFile();
+            long trackId = itemInfo.getFile();
             BlocksMovingAnalysis status = null;
             BlockStoragePolicy existingStoragePolicy;
             // TODO: presently, context internally acquire the lock
@@ -353,7 +347,7 @@ public class StoragePolicySatisfier<T> implements 
SPSService<T>, Runnable {
           blockCount = 0L;
         }
         if (retryItem) {
-          itemInfo.increRetryCount();
+          // itemInfo.increRetryCount();
           this.storageMovementNeeded.add(itemInfo);
         }
       } catch (IOException e) {
@@ -469,7 +463,7 @@ public class StoragePolicySatisfier<T> implements 
SPSService<T>, Runnable {
     for (BlockMovingInfo blkMovingInfo : blockMovingInfos) {
       // Check for at least one block storage movement has been chosen
       try {
-        blockMoveTaskHandler.submitMoveTask(blkMovingInfo);
+        ctxt.submitMoveTask(blkMovingInfo);
         LOG.debug("BlockMovingInfo: {}", blkMovingInfo);
         StorageTypeNodePair nodeStorage = new StorageTypeNodePair(
             blkMovingInfo.getTargetStorageType(), blkMovingInfo.getTarget());
@@ -1092,7 +1086,7 @@ public class StoragePolicySatisfier<T> implements 
SPSService<T>, Runnable {
   }
 
   @VisibleForTesting
-  public BlockStorageMovementAttemptedItems<T> getAttemptedItemsMonitor() {
+  public BlockStorageMovementAttemptedItems getAttemptedItemsMonitor() {
     return storageMovementsMonitor;
   }
 
@@ -1109,7 +1103,7 @@ public class StoragePolicySatisfier<T> implements 
SPSService<T>, Runnable {
   /**
    * Clear queues for given track id.
    */
-  public void clearQueue(T trackId) {
+  public void clearQueue(long trackId) {
     storageMovementNeeded.clearQueue(trackId);
   }
 
@@ -1118,7 +1112,7 @@ public class StoragePolicySatisfier<T> implements 
SPSService<T>, Runnable {
    * attempted or reported time stamp. This is used by
    * {@link BlockStorageMovementAttemptedItems#storageMovementAttemptedItems}.
    */
-  final static class AttemptedItemInfo<T> extends ItemInfo<T> {
+  final static class AttemptedItemInfo extends ItemInfo {
     private long lastAttemptedOrReportedTime;
     private final Set<Block> blocks;
 
@@ -1136,7 +1130,7 @@ public class StoragePolicySatisfier<T> implements 
SPSService<T>, Runnable {
      * @param retryCount
      *          file retry count
      */
-    AttemptedItemInfo(T rootId, T trackId,
+    AttemptedItemInfo(long rootId, long trackId,
         long lastAttemptedOrReportedTime,
         Set<Block> blocks, int retryCount) {
       super(rootId, trackId, retryCount);
@@ -1179,7 +1173,7 @@ public class StoragePolicySatisfier<T> implements 
SPSService<T>, Runnable {
   }
 
   @Override
-  public void addFileToProcess(ItemInfo<T> trackInfo, boolean scanCompleted) {
+  public void addFileToProcess(ItemInfo trackInfo, boolean scanCompleted) {
     storageMovementNeeded.add(trackInfo, scanCompleted);
     if (LOG.isDebugEnabled()) {
       LOG.debug("Added track info for inode {} to block "
@@ -1188,7 +1182,7 @@ public class StoragePolicySatisfier<T> implements 
SPSService<T>, Runnable {
   }
 
   @Override
-  public void addAllFilesToProcess(T startPath, List<ItemInfo<T>> itemInfoList,
+  public void addAllFilesToProcess(long startPath, List<ItemInfo> itemInfoList,
       boolean scanCompleted) {
     getStorageMovementQueue().addAll(startPath, itemInfoList, scanCompleted);
   }
@@ -1204,12 +1198,12 @@ public class StoragePolicySatisfier<T> implements 
SPSService<T>, Runnable {
   }
 
   @VisibleForTesting
-  public BlockStorageMovementNeeded<T> getStorageMovementQueue() {
+  public BlockStorageMovementNeeded getStorageMovementQueue() {
     return storageMovementNeeded;
   }
 
   @Override
-  public void markScanCompletedForPath(T inodeId) {
+  public void markScanCompletedForPath(long inodeId) {
     getStorageMovementQueue().markScanCompletedForDir(inodeId);
   }
 
@@ -1278,15 +1272,4 @@ public class StoragePolicySatisfier<T> implements 
SPSService<T>, Runnable {
         "It should be a positive, non-zero integer value.");
     return spsWorkMultiplier;
   }
-
-  /**
-   * Sets external listener for testing.
-   *
-   * @param blkMovementListener
-   *          block movement listener callback object
-   */
-  @VisibleForTesting
-  void setBlockMovementListener(BlockMovementListener blkMovementListener) {
-    storageMovementsMonitor.setBlockMovementListener(blkMovementListener);
-  }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/8eabfc06/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/StoragePolicySatisfyManager.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/StoragePolicySatisfyManager.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/StoragePolicySatisfyManager.java
index 5ec0372..0507d6b 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/StoragePolicySatisfyManager.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/StoragePolicySatisfyManager.java
@@ -60,7 +60,7 @@ import org.slf4j.LoggerFactory;
 public class StoragePolicySatisfyManager {
   private static final Logger LOG = LoggerFactory
       .getLogger(StoragePolicySatisfyManager.class);
-  private final StoragePolicySatisfier<Long> spsService;
+  private final StoragePolicySatisfier spsService;
   private final boolean storagePolicyEnabled;
   private volatile StoragePolicySatisfierMode mode;
   private final Queue<Long> pathsToBeTraveresed;
@@ -84,7 +84,7 @@ public class StoragePolicySatisfyManager {
     pathsToBeTraveresed = new LinkedList<Long>();
     // instantiate SPS service by just keeps config reference and not starting
     // any supporting threads.
-    spsService = new StoragePolicySatisfier<Long>(conf);
+    spsService = new StoragePolicySatisfier(conf);
     this.namesystem = namesystem;
     this.blkMgr = blkMgr;
   }
@@ -121,10 +121,7 @@ public class StoragePolicySatisfyManager {
       }
       // starts internal daemon service inside namenode
       spsService.init(
-          new IntraSPSNameNodeContext(namesystem, blkMgr, spsService),
-          new IntraSPSNameNodeFileIdCollector(namesystem.getFSDirectory(),
-              spsService),
-          new IntraSPSNameNodeBlockMoveTaskHandler(blkMgr, namesystem), null);
+          new IntraSPSNameNodeContext(namesystem, blkMgr, spsService));
       spsService.start(false, mode);
       break;
     case EXTERNAL:
@@ -221,13 +218,8 @@ public class StoragePolicySatisfyManager {
             mode);
         return;
       }
-      spsService.init(
-          new IntraSPSNameNodeContext(this.namesystem, this.blkMgr, 
spsService),
-          new IntraSPSNameNodeFileIdCollector(this.namesystem.getFSDirectory(),
-              spsService),
-          new IntraSPSNameNodeBlockMoveTaskHandler(this.blkMgr,
-              this.namesystem),
-          null);
+      spsService.init(new IntraSPSNameNodeContext(this.namesystem, this.blkMgr,
+          spsService));
       spsService.start(true, newMode);
       break;
     case EXTERNAL:
@@ -309,7 +301,7 @@ public class StoragePolicySatisfyManager {
   /**
    * @return internal SPS service instance.
    */
-  public SPSService<Long> getInternalSPSService() {
+  public SPSService getInternalSPSService() {
     return this.spsService;
   }
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/8eabfc06/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/NamenodeProtocol.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/NamenodeProtocol.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/NamenodeProtocol.java
index 5ff6ffd..f80477b 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/NamenodeProtocol.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/NamenodeProtocol.java
@@ -209,6 +209,6 @@ public interface NamenodeProtocol {
    *         by External SPS.
    */
   @AtMostOnce
-  String getNextSPSPath() throws IOException;
+  Long getNextSPSPath() throws IOException;
 }
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/8eabfc06/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/sps/ExternalSPSBlockMoveTaskHandler.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/sps/ExternalSPSBlockMoveTaskHandler.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/sps/ExternalSPSBlockMoveTaskHandler.java
index f5225d2..3ea0294 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/sps/ExternalSPSBlockMoveTaskHandler.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/sps/ExternalSPSBlockMoveTaskHandler.java
@@ -76,11 +76,11 @@ public class ExternalSPSBlockMoveTaskHandler implements 
BlockMoveTaskHandler {
   private final SaslDataTransferClient saslClient;
   private final BlockStorageMovementTracker blkMovementTracker;
   private Daemon movementTrackerThread;
-  private final SPSService<String> service;
+  private final SPSService service;
   private final BlockDispatcher blkDispatcher;
 
   public ExternalSPSBlockMoveTaskHandler(Configuration conf,
-      NameNodeConnector nnc, SPSService<String> spsService) {
+      NameNodeConnector nnc, SPSService spsService) {
     int moverThreads = conf.getInt(DFSConfigKeys.DFS_MOVER_MOVERTHREADS_KEY,
         DFSConfigKeys.DFS_MOVER_MOVERTHREADS_DEFAULT);
     moveExecutor = initializeBlockMoverThreadPool(moverThreads);

http://git-wip-us.apache.org/repos/asf/hadoop/blob/8eabfc06/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/sps/ExternalSPSContext.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/sps/ExternalSPSContext.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/sps/ExternalSPSContext.java
index 1cd4664..189bc2b 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/sps/ExternalSPSContext.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/sps/ExternalSPSContext.java
@@ -20,6 +20,7 @@ package org.apache.hadoop.hdfs.server.sps;
 
 import java.io.FileNotFoundException;
 import java.io.IOException;
+import java.util.ArrayList;
 import java.util.List;
 
 import org.apache.hadoop.classification.InterfaceAudience;
@@ -27,6 +28,8 @@ import org.apache.hadoop.fs.FSDataOutputStream;
 import org.apache.hadoop.fs.ParentNotDirectoryException;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.UnresolvedLinkException;
+import org.apache.hadoop.hdfs.DFSUtilClient;
+import org.apache.hadoop.hdfs.protocol.Block;
 import org.apache.hadoop.hdfs.protocol.BlockStoragePolicy;
 import org.apache.hadoop.hdfs.protocol.HdfsConstants.DatanodeReportType;
 import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
@@ -34,10 +37,14 @@ import 
org.apache.hadoop.hdfs.protocol.HdfsLocatedFileStatus;
 import org.apache.hadoop.hdfs.server.balancer.NameNodeConnector;
 import org.apache.hadoop.hdfs.server.blockmanagement.BlockStoragePolicySuite;
 import org.apache.hadoop.hdfs.server.common.HdfsServerConstants;
+import org.apache.hadoop.hdfs.server.namenode.sps.BlockMoveTaskHandler;
+import org.apache.hadoop.hdfs.server.namenode.sps.BlockMovementListener;
 import org.apache.hadoop.hdfs.server.namenode.sps.Context;
+import org.apache.hadoop.hdfs.server.namenode.sps.FileCollector;
 import org.apache.hadoop.hdfs.server.namenode.sps.SPSService;
 import 
org.apache.hadoop.hdfs.server.namenode.sps.StoragePolicySatisfier.DatanodeMap;
 import 
org.apache.hadoop.hdfs.server.namenode.sps.StoragePolicySatisfier.DatanodeWithStorage;
+import 
org.apache.hadoop.hdfs.server.protocol.BlockStorageMovementCommand.BlockMovingInfo;
 import org.apache.hadoop.hdfs.server.protocol.DatanodeStorageReport;
 import org.apache.hadoop.net.NetworkTopology;
 import org.apache.hadoop.security.AccessControlException;
@@ -49,17 +56,24 @@ import org.slf4j.LoggerFactory;
  * SPS from Namenode state.
  */
 @InterfaceAudience.Private
-public class ExternalSPSContext implements Context<String> {
-  public static final Logger LOG =
-      LoggerFactory.getLogger(ExternalSPSContext.class);
-  private SPSService<String> service;
-  private NameNodeConnector nnc = null;
-  private BlockStoragePolicySuite createDefaultSuite =
+public class ExternalSPSContext implements Context {
+  public static final Logger LOG = LoggerFactory
+      .getLogger(ExternalSPSContext.class);
+  private final SPSService service;
+  private final NameNodeConnector nnc;
+  private final BlockStoragePolicySuite createDefaultSuite =
       BlockStoragePolicySuite.createDefaultSuite();
+  private final FileCollector fileCollector;
+  private final BlockMoveTaskHandler externalHandler;
+  private final BlockMovementListener blkMovementListener;
 
-  public ExternalSPSContext(SPSService<String> service, NameNodeConnector nnc) 
{
+  public ExternalSPSContext(SPSService service, NameNodeConnector nnc) {
     this.service = service;
     this.nnc = nnc;
+    this.fileCollector = new ExternalSPSFilePathCollector(service);
+    this.externalHandler = new ExternalSPSBlockMoveTaskHandler(
+        service.getConf(), nnc, service);
+    this.blkMovementListener = new ExternalBlockMovementListener();
   }
 
   @Override
@@ -119,9 +133,10 @@ public class ExternalSPSContext implements Context<String> 
{
   }
 
   @Override
-  public boolean isFileExist(String filePath) {
+  public boolean isFileExist(long path) {
+    Path filePath = DFSUtilClient.makePathFromFileId(path);
     try {
-      return nnc.getDistributedFileSystem().exists(new Path(filePath));
+      return nnc.getDistributedFileSystem().exists(filePath);
     } catch (IllegalArgumentException | IOException e) {
       LOG.warn("Exception while getting file is for the given path:{}",
           filePath, e);
@@ -140,8 +155,9 @@ public class ExternalSPSContext implements Context<String> {
   }
 
   @Override
-  public void removeSPSHint(String inodeId) throws IOException {
-    nnc.getDistributedFileSystem().removeXAttr(new Path(inodeId),
+  public void removeSPSHint(long inodeId) throws IOException {
+    Path filePath = DFSUtilClient.makePathFromFileId(inodeId);
+    nnc.getDistributedFileSystem().removeXAttr(filePath,
         HdfsServerConstants.XATTR_SATISFY_STORAGE_POLICY);
   }
 
@@ -157,11 +173,12 @@ public class ExternalSPSContext implements 
Context<String> {
   }
 
   @Override
-  public HdfsFileStatus getFileInfo(String path) throws IOException {
+  public HdfsFileStatus getFileInfo(long path) throws IOException {
     HdfsLocatedFileStatus fileInfo = null;
     try {
+      Path filePath = DFSUtilClient.makePathFromFileId(path);
       fileInfo = nnc.getDistributedFileSystem().getClient()
-          .getLocatedFileInfo(path, false);
+          .getLocatedFileInfo(filePath.toString(), false);
     } catch (FileNotFoundException e) {
       LOG.debug("Path:{} doesn't exists!", path, e);
     }
@@ -175,7 +192,7 @@ public class ExternalSPSContext implements Context<String> {
   }
 
   @Override
-  public String getNextSPSPath() {
+  public Long getNextSPSPath() {
     try {
       return nnc.getNNProtocolConnection().getNextSPSPath();
     } catch (IOException e) {
@@ -185,7 +202,7 @@ public class ExternalSPSContext implements Context<String> {
   }
 
   @Override
-  public void removeSPSPathId(String pathId) {
+  public void removeSPSPathId(long pathId) {
     // We need not specifically implement for external.
   }
 
@@ -193,4 +210,40 @@ public class ExternalSPSContext implements Context<String> 
{
   public void removeAllSPSPathIds() {
     // We need not specifically implement for external.
   }
-}
+
+  @Override
+  public void scanAndCollectFiles(long path)
+      throws IOException, InterruptedException {
+    fileCollector.scanAndCollectFiles(path);
+  }
+
+  @Override
+  public void submitMoveTask(BlockMovingInfo blkMovingInfo) throws IOException 
{
+    externalHandler.submitMoveTask(blkMovingInfo);
+  }
+
+  @Override
+  public void notifyMovementTriedBlocks(Block[] moveAttemptFinishedBlks) {
+    // External listener if it is plugged-in
+    if (blkMovementListener != null) {
+      blkMovementListener.notifyMovementTriedBlocks(moveAttemptFinishedBlks);
+    }
+  }
+
+  /**
+   * Its an implementation of BlockMovementListener.
+   */
+  private static class ExternalBlockMovementListener
+      implements BlockMovementListener {
+
+    private List<Block> actualBlockMovements = new ArrayList<>();
+
+    @Override
+    public void notifyMovementTriedBlocks(Block[] moveAttemptFinishedBlks) {
+      for (Block block : moveAttemptFinishedBlks) {
+        actualBlockMovements.add(block);
+      }
+      LOG.info("Movement attempted blocks", actualBlockMovements);
+    }
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hadoop/blob/8eabfc06/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/sps/ExternalSPSFilePathCollector.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/sps/ExternalSPSFilePathCollector.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/sps/ExternalSPSFilePathCollector.java
index 9435475..611ff65 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/sps/ExternalSPSFilePathCollector.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/sps/ExternalSPSFilePathCollector.java
@@ -25,6 +25,7 @@ import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hdfs.DFSConfigKeys;
+import org.apache.hadoop.hdfs.DFSUtilClient;
 import org.apache.hadoop.hdfs.DistributedFileSystem;
 import org.apache.hadoop.hdfs.protocol.DirectoryListing;
 import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
@@ -41,14 +42,14 @@ import org.slf4j.LoggerFactory;
  * representation.
  */
 @InterfaceAudience.Private
-public class ExternalSPSFilePathCollector implements FileCollector <String>{
+public class ExternalSPSFilePathCollector implements FileCollector {
   public static final Logger LOG =
       LoggerFactory.getLogger(ExternalSPSFilePathCollector.class);
   private DistributedFileSystem dfs;
-  private SPSService<String> service;
+  private SPSService service;
   private int maxQueueLimitToScan;
 
-  public ExternalSPSFilePathCollector(SPSService<String> service) {
+  public ExternalSPSFilePathCollector(SPSService service) {
     this.service = service;
     this.maxQueueLimitToScan = service.getConf().getInt(
         DFSConfigKeys.DFS_STORAGE_POLICY_SATISFIER_QUEUE_LIMIT_KEY,
@@ -72,13 +73,13 @@ public class ExternalSPSFilePathCollector implements 
FileCollector <String>{
    * Recursively scan the given path and add the file info to SPS service for
    * processing.
    */
-  private long processPath(String startID, String childPath) {
+  private long processPath(Long startID, String childPath) {
     long pendingWorkCount = 0; // to be satisfied file counter
     for (byte[] lastReturnedName = HdfsFileStatus.EMPTY_NAME;;) {
       final DirectoryListing children;
       try {
-        children = dfs.getClient().listPaths(childPath, lastReturnedName,
-            false);
+        children = dfs.getClient().listPaths(childPath,
+            lastReturnedName, false);
       } catch (IOException e) {
         LOG.warn("Failed to list directory " + childPath
             + ". Ignore the directory and continue.", e);
@@ -93,18 +94,18 @@ public class ExternalSPSFilePathCollector implements 
FileCollector <String>{
       }
 
       for (HdfsFileStatus child : children.getPartialListing()) {
-        String childFullPath = child.getFullName(childPath);
         if (child.isFile()) {
-          service.addFileToProcess(
-              new ItemInfo<String>(startID, childFullPath), false);
+          service.addFileToProcess(new ItemInfo(startID, child.getFileId()),
+              false);
           checkProcessingQueuesFree();
           pendingWorkCount++; // increment to be satisfied file count
         } else {
+          String childFullPathName = child.getFullName(childPath);
           if (child.isDirectory()) {
-            if (!childFullPath.endsWith(Path.SEPARATOR)) {
-              childFullPath = childFullPath + Path.SEPARATOR;
+            if (!childFullPathName.endsWith(Path.SEPARATOR)) {
+              childFullPathName = childFullPathName + Path.SEPARATOR;
             }
-            pendingWorkCount += processPath(startID, childFullPath);
+            pendingWorkCount += processPath(startID, childFullPathName);
           }
         }
       }
@@ -150,11 +151,12 @@ public class ExternalSPSFilePathCollector implements 
FileCollector <String>{
   }
 
   @Override
-  public void scanAndCollectFiles(String path) throws IOException {
+  public void scanAndCollectFiles(long pathId) throws IOException {
     if (dfs == null) {
       dfs = getFS(service.getConf());
     }
-    long pendingSatisfyItemsCount = processPath(path, path);
+    Path filePath = DFSUtilClient.makePathFromFileId(pathId);
+    long pendingSatisfyItemsCount = processPath(pathId, filePath.toString());
     // Check whether the given path contains any item to be tracked
     // or the no to be satisfied paths. In case of empty list, add the given
     // inodeId to the 'pendingWorkForDirectory' with empty list so that later
@@ -162,10 +164,10 @@ public class ExternalSPSFilePathCollector implements 
FileCollector <String>{
     // this path is already satisfied the storage policy.
     if (pendingSatisfyItemsCount <= 0) {
       LOG.debug("There is no pending items to satisfy the given path "
-          + "inodeId:{}", path);
-      service.addAllFilesToProcess(path, new ArrayList<>(), true);
+          + "inodeId:{}", pathId);
+      service.addAllFilesToProcess(pathId, new ArrayList<>(), true);
     } else {
-      service.markScanCompletedForPath(path);
+      service.markScanCompletedForPath(pathId);
     }
   }
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/8eabfc06/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/sps/ExternalStoragePolicySatisfier.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/sps/ExternalStoragePolicySatisfier.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/sps/ExternalStoragePolicySatisfier.java
index 236b887..af90f0d 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/sps/ExternalStoragePolicySatisfier.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/sps/ExternalStoragePolicySatisfier.java
@@ -22,7 +22,6 @@ import static org.apache.hadoop.util.ExitUtil.terminate;
 import java.io.IOException;
 import java.net.InetSocketAddress;
 import java.net.URI;
-import java.util.ArrayList;
 import java.util.Collection;
 import java.util.List;
 
@@ -32,11 +31,9 @@ import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hdfs.DFSConfigKeys;
 import org.apache.hadoop.hdfs.DFSUtil;
 import org.apache.hadoop.hdfs.HdfsConfiguration;
-import org.apache.hadoop.hdfs.protocol.Block;
 import 
org.apache.hadoop.hdfs.protocol.HdfsConstants.StoragePolicySatisfierMode;
 import org.apache.hadoop.hdfs.server.balancer.NameNodeConnector;
 import org.apache.hadoop.hdfs.server.common.HdfsServerConstants;
-import org.apache.hadoop.hdfs.server.namenode.sps.BlockMovementListener;
 import org.apache.hadoop.hdfs.server.namenode.sps.StoragePolicySatisfier;
 import org.apache.hadoop.net.NetUtils;
 import org.apache.hadoop.security.SecurityUtil;
@@ -68,8 +65,7 @@ public final class ExternalStoragePolicySatisfier {
       HdfsConfiguration spsConf = new HdfsConfiguration();
       // login with SPS keytab
       secureLogin(spsConf);
-      StoragePolicySatisfier<String> sps = new StoragePolicySatisfier<String>(
-          spsConf);
+      StoragePolicySatisfier sps = new StoragePolicySatisfier(spsConf);
       nnc = getNameNodeConnector(spsConf);
 
       boolean spsRunning;
@@ -82,12 +78,7 @@ public final class ExternalStoragePolicySatisfier {
       }
 
       ExternalSPSContext context = new ExternalSPSContext(sps, nnc);
-      ExternalBlockMovementListener blkMoveListener =
-          new ExternalBlockMovementListener();
-      ExternalSPSBlockMoveTaskHandler externalHandler =
-          new ExternalSPSBlockMoveTaskHandler(spsConf, nnc, sps);
-      sps.init(context, new ExternalSPSFilePathCollector(sps), externalHandler,
-          blkMoveListener);
+      sps.init(context);
       sps.start(true, StoragePolicySatisfierMode.EXTERNAL);
       if (sps != null) {
         sps.join();
@@ -132,21 +123,4 @@ public final class ExternalStoragePolicySatisfier {
       }
     }
   }
-
-  /**
-   * It is implementation of BlockMovementListener.
-   */
-  private static class ExternalBlockMovementListener
-      implements BlockMovementListener {
-
-    private List<Block> actualBlockMovements = new ArrayList<>();
-
-    @Override
-    public void notifyMovementTriedBlocks(Block[] moveAttemptFinishedBlks) {
-      for (Block block : moveAttemptFinishedBlks) {
-        actualBlockMovements.add(block);
-      }
-      LOG.info("Movement attempted blocks:{}", actualBlockMovements);
-    }
-  }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/8eabfc06/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/NamenodeProtocol.proto
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/NamenodeProtocol.proto 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/NamenodeProtocol.proto
index 2acc5a8..89edfbf 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/NamenodeProtocol.proto
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/NamenodeProtocol.proto
@@ -218,7 +218,7 @@ message GetNextSPSPathRequestProto {
 }
 
 message GetNextSPSPathResponseProto {
-  optional string spsPath = 1;
+  optional uint64 spsPath = 1;
 }
 
 /**

http://git-wip-us.apache.org/repos/asf/hadoop/blob/8eabfc06/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/sps/TestBlockStorageMovementAttemptedItems.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/sps/TestBlockStorageMovementAttemptedItems.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/sps/TestBlockStorageMovementAttemptedItems.java
index ed1fe92..f85769f 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/sps/TestBlockStorageMovementAttemptedItems.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/sps/TestBlockStorageMovementAttemptedItems.java
@@ -45,22 +45,22 @@ import org.mockito.Mockito;
  */
 public class TestBlockStorageMovementAttemptedItems {
 
-  private BlockStorageMovementAttemptedItems<Long> bsmAttemptedItems;
-  private BlockStorageMovementNeeded<Long> unsatisfiedStorageMovementFiles;
+  private BlockStorageMovementAttemptedItems bsmAttemptedItems;
+  private BlockStorageMovementNeeded unsatisfiedStorageMovementFiles;
   private final int selfRetryTimeout = 500;
 
   @Before
   public void setup() throws Exception {
     Configuration config = new HdfsConfiguration();
-    Context<Long> ctxt = Mockito.mock(IntraSPSNameNodeContext.class);
-    SPSService<Long> sps = new StoragePolicySatisfier<Long>(config);
+    Context ctxt = Mockito.mock(IntraSPSNameNodeContext.class);
+    SPSService sps = new StoragePolicySatisfier(config);
     Mockito.when(ctxt.isRunning()).thenReturn(true);
     Mockito.when(ctxt.isInSafeMode()).thenReturn(false);
     Mockito.when(ctxt.isFileExist(Mockito.anyLong())).thenReturn(true);
     unsatisfiedStorageMovementFiles =
-        new BlockStorageMovementNeeded<Long>(ctxt, null);
-    bsmAttemptedItems = new BlockStorageMovementAttemptedItems<Long>(sps,
-        unsatisfiedStorageMovementFiles, null);
+        new BlockStorageMovementNeeded(ctxt);
+    bsmAttemptedItems = new BlockStorageMovementAttemptedItems(sps,
+        unsatisfiedStorageMovementFiles, ctxt);
   }
 
   @After
@@ -76,7 +76,7 @@ public class TestBlockStorageMovementAttemptedItems {
     long stopTime = monotonicNow() + (retryTimeout * 2);
     boolean isItemFound = false;
     while (monotonicNow() < (stopTime)) {
-      ItemInfo<Long> ele = null;
+      ItemInfo ele = null;
       while ((ele = unsatisfiedStorageMovementFiles.get()) != null) {
         if (item == ele.getFile()) {
           isItemFound = true;


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org
For additional commands, e-mail: common-commits-h...@hadoop.apache.org

Reply via email to