HDFS-13075. [SPS]: Provide External Context implementation. Contributed by Uma 
Maheswara Rao G.


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/16820957
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/16820957
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/16820957

Branch: refs/heads/HDFS-10285
Commit: 16820957b5b0e0d1771b01aa911113f9628681dc
Parents: 4e89833
Author: Surendra Singh Lilhore <surendralilh...@apache.org>
Authored: Sun Jan 28 20:46:56 2018 +0530
Committer: Rakesh Radhakrishnan <rake...@apache.org>
Committed: Tue Jul 31 12:10:31 2018 +0530

----------------------------------------------------------------------
 .../NamenodeProtocolServerSideTranslatorPB.java |  67 +++++
 .../NamenodeProtocolTranslatorPB.java           |  58 ++++
 .../hdfs/server/balancer/NameNodeConnector.java |  28 +-
 .../server/blockmanagement/BlockManager.java    |  19 ++
 .../server/blockmanagement/DatanodeManager.java |  18 ++
 .../hdfs/server/common/HdfsServerConstants.java |   3 +-
 .../hdfs/server/namenode/NameNodeRpcServer.java |  53 +++-
 .../sps/BlockStorageMovementNeeded.java         |   8 +-
 .../hdfs/server/namenode/sps/Context.java       |   9 +-
 .../namenode/sps/IntraSPSNameNodeContext.java   |  23 +-
 .../namenode/sps/StoragePolicySatisfier.java    |  15 +-
 .../hdfs/server/protocol/NamenodeProtocol.java  |  46 +++-
 .../hdfs/server/sps/ExternalSPSContext.java     | 271 +++++++++++++++++++
 .../src/main/proto/NamenodeProtocol.proto       |  57 ++++
 .../sps/TestExternalStoragePolicySatisfier.java |  31 +--
 15 files changed, 652 insertions(+), 54 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/16820957/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/NamenodeProtocolServerSideTranslatorPB.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/NamenodeProtocolServerSideTranslatorPB.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/NamenodeProtocolServerSideTranslatorPB.java
index 90c2c49..25eafdf 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/NamenodeProtocolServerSideTranslatorPB.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/NamenodeProtocolServerSideTranslatorPB.java
@@ -23,6 +23,8 @@ import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
 import org.apache.hadoop.hdfs.protocol.DatanodeInfo.DatanodeInfoBuilder;
 import 
org.apache.hadoop.hdfs.protocol.proto.HdfsServerProtos.VersionRequestProto;
 import 
org.apache.hadoop.hdfs.protocol.proto.HdfsServerProtos.VersionResponseProto;
+import 
org.apache.hadoop.hdfs.protocol.proto.NamenodeProtocolProtos.CheckDNSpaceRequestProto;
+import 
org.apache.hadoop.hdfs.protocol.proto.NamenodeProtocolProtos.CheckDNSpaceResponseProto;
 import 
org.apache.hadoop.hdfs.protocol.proto.NamenodeProtocolProtos.EndCheckpointRequestProto;
 import 
org.apache.hadoop.hdfs.protocol.proto.NamenodeProtocolProtos.EndCheckpointResponseProto;
 import 
org.apache.hadoop.hdfs.protocol.proto.NamenodeProtocolProtos.ErrorReportRequestProto;
@@ -33,10 +35,16 @@ import 
org.apache.hadoop.hdfs.protocol.proto.NamenodeProtocolProtos.GetBlocksReq
 import 
org.apache.hadoop.hdfs.protocol.proto.NamenodeProtocolProtos.GetBlocksResponseProto;
 import 
org.apache.hadoop.hdfs.protocol.proto.NamenodeProtocolProtos.GetEditLogManifestRequestProto;
 import 
org.apache.hadoop.hdfs.protocol.proto.NamenodeProtocolProtos.GetEditLogManifestResponseProto;
+import 
org.apache.hadoop.hdfs.protocol.proto.NamenodeProtocolProtos.GetFilePathRequestProto;
+import 
org.apache.hadoop.hdfs.protocol.proto.NamenodeProtocolProtos.GetFilePathResponseProto;
 import 
org.apache.hadoop.hdfs.protocol.proto.NamenodeProtocolProtos.GetMostRecentCheckpointTxIdRequestProto;
 import 
org.apache.hadoop.hdfs.protocol.proto.NamenodeProtocolProtos.GetMostRecentCheckpointTxIdResponseProto;
+import 
org.apache.hadoop.hdfs.protocol.proto.NamenodeProtocolProtos.GetNextSPSPathIdRequestProto;
+import 
org.apache.hadoop.hdfs.protocol.proto.NamenodeProtocolProtos.GetNextSPSPathIdResponseProto;
 import 
org.apache.hadoop.hdfs.protocol.proto.NamenodeProtocolProtos.GetTransactionIdRequestProto;
 import 
org.apache.hadoop.hdfs.protocol.proto.NamenodeProtocolProtos.GetTransactionIdResponseProto;
+import 
org.apache.hadoop.hdfs.protocol.proto.NamenodeProtocolProtos.HasLowRedundancyBlocksRequestProto;
+import 
org.apache.hadoop.hdfs.protocol.proto.NamenodeProtocolProtos.HasLowRedundancyBlocksResponseProto;
 import 
org.apache.hadoop.hdfs.protocol.proto.NamenodeProtocolProtos.IsRollingUpgradeRequestProto;
 import 
org.apache.hadoop.hdfs.protocol.proto.NamenodeProtocolProtos.IsRollingUpgradeResponseProto;
 import 
org.apache.hadoop.hdfs.protocol.proto.NamenodeProtocolProtos.IsUpgradeFinalizedRequestProto;
@@ -257,4 +265,63 @@ public class NamenodeProtocolServerSideTranslatorPB 
implements
     return IsRollingUpgradeResponseProto.newBuilder()
         .setIsRollingUpgrade(isRollingUpgrade).build();
   }
+
+  @Override
+  public GetNextSPSPathIdResponseProto getNextSPSPathId(
+      RpcController controller, GetNextSPSPathIdRequestProto request)
+          throws ServiceException {
+    try {
+      Long nextSPSPathId = impl.getNextSPSPathId();
+      if (nextSPSPathId == null) {
+        return GetNextSPSPathIdResponseProto.newBuilder().build();
+      }
+      return 
GetNextSPSPathIdResponseProto.newBuilder().setFileId(nextSPSPathId)
+          .build();
+    } catch (IOException e) {
+      throw new ServiceException(e);
+    }
+  }
+
+  @Override
+  public GetFilePathResponseProto getFilePath(RpcController controller,
+      GetFilePathRequestProto request) throws ServiceException {
+    try {
+      return GetFilePathResponseProto.newBuilder()
+          .setSrcPath(impl.getFilePath(request.getFileId())).build();
+    } catch (IOException e) {
+      throw new ServiceException(e);
+    }
+  }
+
+  @Override
+  public CheckDNSpaceResponseProto checkDNSpaceForScheduling(
+      RpcController controller, CheckDNSpaceRequestProto request)
+          throws ServiceException {
+    try {
+      CheckDNSpaceResponseProto build = CheckDNSpaceResponseProto.newBuilder()
+          .setIsGoodDatanodeWithSpace(impl.checkDNSpaceForScheduling(
+              PBHelperClient.convert(request.getDnInfo()),
+              PBHelperClient.convertStorageType(request.getStorageType()),
+              request.getEstimatedSize()))
+          .build();
+      return build;
+    } catch (IOException e) {
+      throw new ServiceException(e);
+    }
+  }
+
+  @Override
+  public HasLowRedundancyBlocksResponseProto hasLowRedundancyBlocks(
+      RpcController controller, HasLowRedundancyBlocksRequestProto request)
+          throws ServiceException {
+    try {
+      return HasLowRedundancyBlocksResponseProto.newBuilder()
+          .setHasLowRedundancyBlocks(
+              impl.hasLowRedundancyBlocks(request.getInodeId()))
+          .build();
+    } catch (IOException e) {
+      throw new ServiceException(e);
+    }
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/16820957/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/NamenodeProtocolTranslatorPB.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/NamenodeProtocolTranslatorPB.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/NamenodeProtocolTranslatorPB.java
index 632f8b7..8bff499 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/NamenodeProtocolTranslatorPB.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/NamenodeProtocolTranslatorPB.java
@@ -22,18 +22,24 @@ import java.io.IOException;
 
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.fs.StorageType;
 import org.apache.hadoop.hdfs.protocol.DatanodeID;
 import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
 import 
org.apache.hadoop.hdfs.protocol.proto.HdfsServerProtos.NamenodeCommandProto;
 import 
org.apache.hadoop.hdfs.protocol.proto.HdfsServerProtos.VersionRequestProto;
+import 
org.apache.hadoop.hdfs.protocol.proto.NamenodeProtocolProtos.CheckDNSpaceRequestProto;
 import 
org.apache.hadoop.hdfs.protocol.proto.NamenodeProtocolProtos.EndCheckpointRequestProto;
 import 
org.apache.hadoop.hdfs.protocol.proto.NamenodeProtocolProtos.ErrorReportRequestProto;
 import 
org.apache.hadoop.hdfs.protocol.proto.NamenodeProtocolProtos.GetBlockKeysRequestProto;
 import 
org.apache.hadoop.hdfs.protocol.proto.NamenodeProtocolProtos.GetBlockKeysResponseProto;
 import 
org.apache.hadoop.hdfs.protocol.proto.NamenodeProtocolProtos.GetBlocksRequestProto;
 import 
org.apache.hadoop.hdfs.protocol.proto.NamenodeProtocolProtos.GetEditLogManifestRequestProto;
+import 
org.apache.hadoop.hdfs.protocol.proto.NamenodeProtocolProtos.GetFilePathRequestProto;
 import 
org.apache.hadoop.hdfs.protocol.proto.NamenodeProtocolProtos.GetMostRecentCheckpointTxIdRequestProto;
+import 
org.apache.hadoop.hdfs.protocol.proto.NamenodeProtocolProtos.GetNextSPSPathIdRequestProto;
+import 
org.apache.hadoop.hdfs.protocol.proto.NamenodeProtocolProtos.GetNextSPSPathIdResponseProto;
 import 
org.apache.hadoop.hdfs.protocol.proto.NamenodeProtocolProtos.GetTransactionIdRequestProto;
+import 
org.apache.hadoop.hdfs.protocol.proto.NamenodeProtocolProtos.HasLowRedundancyBlocksRequestProto;
 import 
org.apache.hadoop.hdfs.protocol.proto.NamenodeProtocolProtos.IsRollingUpgradeRequestProto;
 import 
org.apache.hadoop.hdfs.protocol.proto.NamenodeProtocolProtos.IsRollingUpgradeResponseProto;
 import 
org.apache.hadoop.hdfs.protocol.proto.NamenodeProtocolProtos.IsUpgradeFinalizedRequestProto;
@@ -263,4 +269,56 @@ public class NamenodeProtocolTranslatorPB implements 
NamenodeProtocol,
       throw ProtobufHelper.getRemoteException(e);
     }
   }
+
+  @Override
+  public Long getNextSPSPathId() throws IOException {
+    GetNextSPSPathIdRequestProto req =
+        GetNextSPSPathIdRequestProto.newBuilder().build();
+    try {
+      GetNextSPSPathIdResponseProto nextSPSPathId =
+          rpcProxy.getNextSPSPathId(NULL_CONTROLLER, req);
+      return nextSPSPathId.hasFileId() ? nextSPSPathId.getFileId() : null;
+    } catch (ServiceException e) {
+      throw ProtobufHelper.getRemoteException(e);
+    }
+  }
+
+  @Override
+  public String getFilePath(Long inodeId) throws IOException {
+    GetFilePathRequestProto req =
+        GetFilePathRequestProto.newBuilder().setFileId(inodeId).build();
+    try {
+      return rpcProxy.getFilePath(NULL_CONTROLLER, req).getSrcPath();
+    } catch (ServiceException e) {
+      throw ProtobufHelper.getRemoteException(e);
+    }
+  }
+
+  @Override
+  public boolean checkDNSpaceForScheduling(DatanodeInfo dn, StorageType type,
+      long estimatedSize) throws IOException {
+    CheckDNSpaceRequestProto req = CheckDNSpaceRequestProto.newBuilder()
+        .setDnInfo(PBHelperClient.convert(dn))
+        .setStorageType(PBHelperClient.convertStorageType(type))
+        .setEstimatedSize(estimatedSize).build();
+    try {
+      return rpcProxy.checkDNSpaceForScheduling(NULL_CONTROLLER, req)
+          .getIsGoodDatanodeWithSpace();
+    } catch (ServiceException e) {
+      throw ProtobufHelper.getRemoteException(e);
+    }
+  }
+
+  @Override
+  public boolean hasLowRedundancyBlocks(long inodeId) throws IOException {
+    HasLowRedundancyBlocksRequestProto req = HasLowRedundancyBlocksRequestProto
+        .newBuilder().setInodeId(inodeId).build();
+    try {
+      return rpcProxy.hasLowRedundancyBlocks(NULL_CONTROLLER, req)
+          .getHasLowRedundancyBlocks();
+    } catch (ServiceException e) {
+      throw ProtobufHelper.getRemoteException(e);
+    }
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/16820957/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/NameNodeConnector.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/NameNodeConnector.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/NameNodeConnector.java
index 6bfbbb3..2b3c193 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/NameNodeConnector.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/NameNodeConnector.java
@@ -66,7 +66,8 @@ public class NameNodeConnector implements Closeable {
 
   public static final int DEFAULT_MAX_IDLE_ITERATIONS = 5;
   private static boolean write2IdFile = true;
-  
+  private static boolean checkOtherInstanceRunning = true;
+
   /** Create {@link NameNodeConnector} for the given namenodes. */
   public static List<NameNodeConnector> newNameNodeConnectors(
       Collection<URI> namenodes, String name, Path idPath, Configuration conf,
@@ -101,6 +102,11 @@ public class NameNodeConnector implements Closeable {
     NameNodeConnector.write2IdFile = write2IdFile;
   }
 
+  @VisibleForTesting
+  public static void checkOtherInstanceRunning(boolean toCheck) {
+    NameNodeConnector.checkOtherInstanceRunning = toCheck;
+  }
+
   private final URI nameNodeUri;
   private final String blockpoolID;
 
@@ -111,7 +117,7 @@ public class NameNodeConnector implements Closeable {
 
   private final DistributedFileSystem fs;
   private final Path idPath;
-  private final OutputStream out;
+  private OutputStream out;
   private final List<Path> targetPaths;
   private final AtomicLong bytesMoved = new AtomicLong();
 
@@ -141,10 +147,12 @@ public class NameNodeConnector implements Closeable {
     this.keyManager = new KeyManager(blockpoolID, namenode,
         defaults.getEncryptDataTransfer(), conf);
     // if it is for test, we do not create the id file
-    out = checkAndMarkRunning();
-    if (out == null) {
-      // Exit if there is another one running.
-      throw new IOException("Another " + name + " is running.");
+    if (checkOtherInstanceRunning) {
+      out = checkAndMarkRunning();
+      if (out == null) {
+        // Exit if there is another one running.
+        throw new IOException("Another " + name + " is running.");
+      }
     }
   }
 
@@ -285,13 +293,19 @@ public class NameNodeConnector implements Closeable {
     IOUtils.closeStream(out); 
     if (fs != null) {
       try {
-        fs.delete(idPath, true);
+        if (checkOtherInstanceRunning) {
+          fs.delete(idPath, true);
+        }
       } catch(IOException ioe) {
         LOG.warn("Failed to delete " + idPath, ioe);
       }
     }
   }
 
+  public NamenodeProtocol getNNProtocolConnection() {
+    return this.namenode;
+  }
+
   @Override
   public String toString() {
     return getClass().getSimpleName() + "[namenodeUri=" + nameNodeUri

http://git-wip-us.apache.org/repos/asf/hadoop/blob/16820957/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java
index 9348769..daaa7a3 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java
@@ -5013,6 +5013,25 @@ public class BlockManager implements BlockStatsMXBean {
   }
 
   /**
+   * Check whether file id has low redundancy blocks.
+   *
+   * @param inodeID
+   *          - inode id
+   */
+  public boolean hasLowRedundancyBlocks(long inodeID) {
+    namesystem.readLock();
+    try {
+      BlockCollection bc = namesystem.getBlockCollection(inodeID);
+      if (bc == null) {
+        return false;
+      }
+      return hasLowRedundancyBlocks(bc);
+    } finally {
+      namesystem.readUnlock();
+    }
+  }
+
+  /**
    * Gets the storage policy satisfier instance.
    *
    * @return sps

http://git-wip-us.apache.org/repos/asf/hadoop/blob/16820957/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java
index c24a38b..3542864 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java
@@ -30,6 +30,7 @@ import org.apache.hadoop.HadoopIllegalArgumentException;
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.classification.InterfaceStability;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.StorageType;
 import org.apache.hadoop.hdfs.DFSConfigKeys;
 import org.apache.hadoop.hdfs.DFSUtil;
 import org.apache.hadoop.hdfs.HdfsConfiguration;
@@ -2067,5 +2068,22 @@ public class DatanodeManager {
     }
     return reports;
   }
+
+  public boolean verifyTargetDatanodeHasSpaceForScheduling(DatanodeInfo dn,
+      StorageType type, long estimatedSize) {
+    namesystem.readLock();
+    try {
+      DatanodeDescriptor datanode =
+          blockManager.getDatanodeManager().getDatanode(dn.getDatanodeUuid());
+      if (datanode == null) {
+        LOG.debug("Target datanode: " + dn + " doesn't exists");
+        return false;
+      }
+      return null != datanode.chooseStorage4Block(type, estimatedSize);
+    } finally {
+      namesystem.readUnlock();
+    }
+  }
+
 }
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/16820957/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/HdfsServerConstants.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/HdfsServerConstants.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/HdfsServerConstants.java
index 42a2fc6..1378de2 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/HdfsServerConstants.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/HdfsServerConstants.java
@@ -365,8 +365,7 @@ public interface HdfsServerConstants {
   String XATTR_ERASURECODING_POLICY =
       "system.hdfs.erasurecoding.policy";
 
-  String XATTR_SATISFY_STORAGE_POLICY =
-      "system.hdfs.satisfy.storage.policy";
+  String XATTR_SATISFY_STORAGE_POLICY = "user.hdfs.sps.xattr";
 
   Path MOVER_ID_PATH = new Path("/system/mover.id");
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/16820957/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java
index 4738bf5..0e50965 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java
@@ -2537,10 +2537,15 @@ public class NameNodeRpcServer implements 
NamenodeProtocols {
   @Override
   public boolean isStoragePolicySatisfierRunning() throws IOException {
     checkNNStartup();
+    String operationName = "isStoragePolicySatisfierRunning";
+    namesystem.checkSuperuserPrivilege(operationName);
     if (nn.isStandbyState()) {
       throw new StandbyException("Not supported by Standby Namenode.");
     }
-    return namesystem.getBlockManager().isStoragePolicySatisfierRunning();
+    boolean isSPSRunning =
+        namesystem.getBlockManager().isStoragePolicySatisfierRunning();
+    namesystem.logAuditEvent(true, operationName, null);
+    return isSPSRunning;
   }
 
   @Override
@@ -2553,4 +2558,50 @@ public class NameNodeRpcServer implements 
NamenodeProtocols {
     return namesystem.getBlockManager().checkStoragePolicySatisfyPathStatus(
         path);
   }
+
+  @Override
+  public String getFilePath(Long inodeId) throws IOException {
+    checkNNStartup();
+    String operationName = "getFilePath";
+    namesystem.checkSuperuserPrivilege(operationName);
+    if (nn.isStandbyState()) {
+      throw new StandbyException("Not supported by Standby Namenode.");
+    }
+    return namesystem.getFilePath(inodeId);
+  }
+
+  @Override
+  public Long getNextSPSPathId() throws IOException {
+    checkNNStartup();
+    String operationName = "getNextSPSPathId";
+    namesystem.checkSuperuserPrivilege(operationName);
+    if (nn.isStandbyState()) {
+      throw new StandbyException("Not supported by Standby Namenode.");
+    }
+    return namesystem.getBlockManager().getNextSPSPathId();
+  }
+
+  @Override
+  public boolean checkDNSpaceForScheduling(DatanodeInfo dn,
+      StorageType type, long estimatedSize) throws IOException {
+    checkNNStartup();
+    String operationName = "checkDNSpaceForScheduling";
+    namesystem.checkSuperuserPrivilege(operationName);
+    if (nn.isStandbyState()) {
+      throw new StandbyException("Not supported by Standby Namenode.");
+    }
+    return namesystem.getBlockManager().getDatanodeManager()
+        .verifyTargetDatanodeHasSpaceForScheduling(dn, type, estimatedSize);
+  }
+
+  @Override
+  public boolean hasLowRedundancyBlocks(long inodeId) throws IOException {
+    checkNNStartup();
+    String operationName = "hasLowRedundancyBlocks";
+    namesystem.checkSuperuserPrivilege(operationName);
+    if (nn.isStandbyState()) {
+      throw new StandbyException("Not supported by Standby Namenode.");
+    }
+    return namesystem.getBlockManager().hasLowRedundancyBlocks(inodeId);
+  }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/16820957/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/BlockStorageMovementNeeded.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/BlockStorageMovementNeeded.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/BlockStorageMovementNeeded.java
index 39c50a7..8a10183 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/BlockStorageMovementNeeded.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/BlockStorageMovementNeeded.java
@@ -319,12 +319,16 @@ public class BlockStorageMovementNeeded {
           String reClass = t.getClass().getName();
           if (InterruptedException.class.getName().equals(reClass)) {
             LOG.info("SPSPathIdProcessor thread is interrupted. Stopping..");
-            Thread.currentThread().interrupt();
             break;
           }
           LOG.warn("Exception while scanning file inodes to satisfy the 
policy",
               t);
-          // TODO: may be we should retry the current inode id?
+          try {
+            Thread.sleep(3000);
+          } catch (InterruptedException e) {
+            LOG.info("Interrupted while waiting in SPSPathIdProcessor", t);
+            break;
+          }
         }
       }
     }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/16820957/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/Context.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/Context.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/Context.java
index f103dfe..bddbc1b 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/Context.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/Context.java
@@ -149,8 +149,8 @@ public interface Context {
    * @return true if the given datanode has sufficient space to occupy 
blockSize
    *         data, false otherwise.
    */
-  boolean verifyTargetDatanodeHasSpaceForScheduling(DatanodeInfo dn,
-      StorageType type, long blockSize);
+  boolean checkDNSpaceForScheduling(DatanodeInfo dn, StorageType type,
+      long blockSize);
 
   /**
    * @return next SPS path id to process.
@@ -175,4 +175,9 @@ public interface Context {
    */
   String getFilePath(Long inodeId);
 
+  /**
+   * Close the resources.
+   */
+  void close() throws IOException;
+
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/16820957/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/IntraSPSNameNodeContext.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/IntraSPSNameNodeContext.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/IntraSPSNameNodeContext.java
index c658812..191886c 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/IntraSPSNameNodeContext.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/IntraSPSNameNodeContext.java
@@ -30,7 +30,6 @@ import org.apache.hadoop.hdfs.protocol.BlockStoragePolicy;
 import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
 import org.apache.hadoop.hdfs.protocol.HdfsConstants.DatanodeReportType;
 import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
-import org.apache.hadoop.hdfs.server.blockmanagement.BlockCollection;
 import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager;
 import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor;
 import org.apache.hadoop.hdfs.server.common.HdfsServerConstants;
@@ -98,17 +97,8 @@ public class IntraSPSNameNodeContext implements Context {
   }
 
   @Override
-  public boolean hasLowRedundancyBlocks(long inodeID) {
-    namesystem.readLock();
-    try {
-      BlockCollection bc = namesystem.getBlockCollection(inodeID);
-      if (bc == null) {
-        return false;
-      }
-      return blockManager.hasLowRedundancyBlocks(bc);
-    } finally {
-      namesystem.readUnlock();
-    }
+  public boolean hasLowRedundancyBlocks(long inodeId) {
+    return blockManager.hasLowRedundancyBlocks(inodeId);
   }
 
   @Override
@@ -170,8 +160,8 @@ public class IntraSPSNameNodeContext implements Context {
   }
 
   @Override
-  public boolean verifyTargetDatanodeHasSpaceForScheduling(DatanodeInfo dn,
-      StorageType type, long blockSize) {
+  public boolean checkDNSpaceForScheduling(DatanodeInfo dn, StorageType type,
+      long blockSize) {
     namesystem.readLock();
     try {
       DatanodeDescriptor datanode = blockManager.getDatanodeManager()
@@ -205,4 +195,9 @@ public class IntraSPSNameNodeContext implements Context {
   public String getFilePath(Long inodeId) {
     return namesystem.getFilePath(inodeId);
   }
+
+  @Override
+  public void close() throws IOException {
+    // Nothing to clean.
+  }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/16820957/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/StoragePolicySatisfier.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/StoragePolicySatisfier.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/StoragePolicySatisfier.java
index 33ad6f4..89799fc 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/StoragePolicySatisfier.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/StoragePolicySatisfier.java
@@ -325,6 +325,9 @@ public class StoragePolicySatisfier implements SPSService, 
Runnable {
               }
             }
           }
+        } else {
+          LOG.info("Namenode is in safemode. It will retry again.");
+          Thread.sleep(3000);
         }
         int numLiveDn = ctxt.getNumLiveDataNodes();
         if (storageMovementNeeded.size() == 0
@@ -706,8 +709,8 @@ public class StoragePolicySatisfier implements SPSService, 
Runnable {
   private StorageTypeNodePair chooseTargetTypeInSameNode(LocatedBlock 
blockInfo,
       DatanodeInfo source, List<StorageType> targetTypes) {
     for (StorageType t : targetTypes) {
-      boolean goodTargetDn = ctxt.verifyTargetDatanodeHasSpaceForScheduling(
-          source, t, blockInfo.getBlockSize());
+      boolean goodTargetDn =
+          ctxt.checkDNSpaceForScheduling(source, t, blockInfo.getBlockSize());
       if (goodTargetDn) {
         return new StorageTypeNodePair(t, source);
       }
@@ -720,8 +723,8 @@ public class StoragePolicySatisfier implements SPSService, 
Runnable {
       StorageTypeNodeMap locsForExpectedStorageTypes,
       List<DatanodeInfo> excludeNodes) {
     for (StorageType t : targetTypes) {
-      List<DatanodeInfo> nodesWithStorages = locsForExpectedStorageTypes
-          .getNodesWithStorages(t);
+      List<DatanodeInfo> nodesWithStorages =
+          locsForExpectedStorageTypes.getNodesWithStorages(t);
       if (nodesWithStorages == null || nodesWithStorages.isEmpty()) {
         continue; // no target nodes with the required storage type.
       }
@@ -729,8 +732,8 @@ public class StoragePolicySatisfier implements SPSService, 
Runnable {
       for (DatanodeInfo target : nodesWithStorages) {
         if (!excludeNodes.contains(target)
             && matcher.match(ctxt.getNetworkTopology(), source, target)) {
-          boolean goodTargetDn = 
ctxt.verifyTargetDatanodeHasSpaceForScheduling(
-              target, t, block.getBlockSize());
+          boolean goodTargetDn =
+              ctxt.checkDNSpaceForScheduling(target, t, block.getBlockSize());
           if (goodTargetDn) {
             return new StorageTypeNodePair(t, target);
           }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/16820957/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/NamenodeProtocol.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/NamenodeProtocol.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/NamenodeProtocol.java
index 0c8adc6..9f5cadd 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/NamenodeProtocol.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/NamenodeProtocol.java
@@ -21,6 +21,7 @@ package org.apache.hadoop.hdfs.server.protocol;
 import java.io.IOException;
 
 import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.fs.StorageType;
 import org.apache.hadoop.hdfs.DFSConfigKeys;
 import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
 import org.apache.hadoop.hdfs.security.token.block.ExportedBlockKeys;
@@ -31,7 +32,8 @@ import org.apache.hadoop.security.KerberosInfo;
 
 /*****************************************************************************
  * Protocol that a secondary NameNode uses to communicate with the NameNode.
- * It's used to get part of the name node state
+ * Also used by external storage policy satisfier. It's used to get part of the
+ * name node state
  *****************************************************************************/
 @KerberosInfo(
     serverPrincipal = DFSConfigKeys.DFS_NAMENODE_KERBEROS_PRINCIPAL_KEY)
@@ -202,5 +204,47 @@ public interface NamenodeProtocol {
    */
   @Idempotent
   boolean isRollingUpgrade() throws IOException;
+
+  /**
+   * Gets the file path for the given file id. This API used by External SPS.
+   *
+   * @param inodeId
+   *          - file inode id.
+   * @return path
+   */
+  @Idempotent
+  String getFilePath(Long inodeId) throws IOException;
+
+  /**
+   * @return Gets the next available sps path id, otherwise null. This API used
+   *         by External SPS.
+   */
+  @AtMostOnce
+  Long getNextSPSPathId() throws IOException;
+
+  /**
+   * Verifies whether the given Datanode has the enough estimated size with
+   * given storage type for scheduling the block. This API used by External 
SPS.
+   *
+   * @param dn
+   *          - datanode
+   * @param type
+   *          - storage type
+   * @param estimatedSize
+   *          - size
+   */
+  @Idempotent
+  boolean checkDNSpaceForScheduling(DatanodeInfo dn, StorageType type,
+      long estimatedSize) throws IOException;
+
+  /**
+   * Check if any low redundancy blocks for given file id. This API used by
+   * External SPS.
+   *
+   * @param inodeID
+   *          - inode id.
+   */
+  @Idempotent
+  boolean hasLowRedundancyBlocks(long inodeID) throws IOException;
 }
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/16820957/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/sps/ExternalSPSContext.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/sps/ExternalSPSContext.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/sps/ExternalSPSContext.java
new file mode 100644
index 0000000..e5b04ba
--- /dev/null
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/sps/ExternalSPSContext.java
@@ -0,0 +1,271 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hdfs.server.sps;
+
+import java.io.IOException;
+import java.net.URI;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.ParentNotDirectoryException;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.StorageType;
+import org.apache.hadoop.fs.UnresolvedLinkException;
+import org.apache.hadoop.hdfs.DFSUtil;
+import org.apache.hadoop.hdfs.protocol.BlockStoragePolicy;
+import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
+import org.apache.hadoop.hdfs.protocol.HdfsConstants.DatanodeReportType;
+import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
+import org.apache.hadoop.hdfs.server.balancer.NameNodeConnector;
+import org.apache.hadoop.hdfs.server.blockmanagement.BlockStoragePolicySuite;
+import org.apache.hadoop.hdfs.server.common.HdfsServerConstants;
+import org.apache.hadoop.hdfs.server.namenode.sps.Context;
+import org.apache.hadoop.hdfs.server.namenode.sps.SPSService;
+import org.apache.hadoop.hdfs.server.protocol.DatanodeStorageReport;
+import org.apache.hadoop.net.NetworkTopology;
+import org.apache.hadoop.security.AccessControlException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * This class used to connect to Namenode and gets the required information to
+ * SPS from Namenode state.
+ */
+@InterfaceAudience.Private
+public class ExternalSPSContext implements Context {
+  public static final Logger LOG =
+      LoggerFactory.getLogger(ExternalSPSContext.class);
+  private SPSService service;
+  private NameNodeConnector nnc = null;
+  private Object nnConnectionLock = new Object();
+  private BlockStoragePolicySuite createDefaultSuite =
+      BlockStoragePolicySuite.createDefaultSuite();
+
+  public ExternalSPSContext(SPSService service) {
+    this.service = service;
+    initializeNamenodeConnector();
+  }
+
+  @Override
+  public boolean isRunning() {
+    return service.isRunning();
+  }
+
+  @Override
+  public boolean isInSafeMode() {
+    initializeNamenodeConnector();
+    try {
+      return nnc != null ? nnc.getDistributedFileSystem().isInSafeMode()
+          : false;
+    } catch (IOException e) {
+      LOG.warn("Exception while creating Namenode Connector..", e);
+      return false;
+    }
+  }
+
+  @Override
+  public boolean isMoverRunning() {
+    initializeNamenodeConnector();
+    try {
+      FSDataOutputStream out = nnc.getDistributedFileSystem()
+          .append(HdfsServerConstants.MOVER_ID_PATH);
+      out.close();
+      return false;
+    } catch (IOException ioe) {
+      LOG.warn("Exception while checking mover is running..", ioe);
+      return true;
+    }
+
+  }
+
+  @Override
+  public long getFileID(String path) throws UnresolvedLinkException,
+      AccessControlException, ParentNotDirectoryException {
+    initializeNamenodeConnector();
+    HdfsFileStatus fs = null;
+    try {
+      fs = (HdfsFileStatus) nnc.getDistributedFileSystem().getFileStatus(
+          new Path(path));
+      LOG.info("Fetched the fileID:{} for the path:{}", fs.getFileId(), path);
+    } catch (IllegalArgumentException | IOException e) {
+      LOG.warn("Exception while getting file is for the given path:{}.", path,
+          e);
+    }
+    return fs != null ? fs.getFileId() : 0;
+  }
+
+  @Override
+  public NetworkTopology getNetworkTopology() {
+    return NetworkTopology.getInstance(service.getConf());
+  }
+
+  @Override
+  public boolean isFileExist(long inodeId) {
+    initializeNamenodeConnector();
+    String filePath = null;
+    try {
+      filePath = getFilePath(inodeId);
+      return nnc.getDistributedFileSystem().exists(new Path(filePath));
+    } catch (IllegalArgumentException | IOException e) {
+      LOG.warn("Exception while getting file is for the given path:{} "
+          + "and fileId:{}", filePath, inodeId, e);
+    }
+    return false;
+  }
+
+  @Override
+  public BlockStoragePolicy getStoragePolicy(byte policyId) {
+    return createDefaultSuite.getPolicy(policyId);
+  }
+
+  @Override
+  public void addDropPreviousSPSWorkAtDNs() {
+    // Nothing todo
+  }
+
+  @Override
+  public void removeSPSHint(long inodeId) throws IOException {
+    initializeNamenodeConnector();
+    nnc.getDistributedFileSystem().removeXAttr(new Path(getFilePath(inodeId)),
+        HdfsServerConstants.XATTR_SATISFY_STORAGE_POLICY);
+  }
+
+  @Override
+  public int getNumLiveDataNodes() {
+    initializeNamenodeConnector();
+    try {
+      return nnc.getDistributedFileSystem()
+          .getDataNodeStats(DatanodeReportType.LIVE).length;
+    } catch (IOException e) {
+      LOG.warn("Exception while getting number of live datanodes.", e);
+    }
+    return 0;
+  }
+
+  @Override
+  public HdfsFileStatus getFileInfo(long inodeID) throws IOException {
+    initializeNamenodeConnector();
+    return nnc.getDistributedFileSystem().getClient()
+        .getLocatedFileInfo(getFilePath(inodeID), false);
+  }
+
+  @Override
+  public DatanodeStorageReport[] getLiveDatanodeStorageReport()
+      throws IOException {
+    initializeNamenodeConnector();
+    return nnc.getLiveDatanodeStorageReport();
+  }
+
+  @Override
+  public boolean hasLowRedundancyBlocks(long inodeID) {
+    initializeNamenodeConnector();
+    try {
+      return nnc.getNNProtocolConnection().hasLowRedundancyBlocks(inodeID);
+    } catch (IOException e) {
+      LOG.warn("Failed to check whether fileid:{} has low redundancy blocks.",
+          inodeID, e);
+      return false;
+    }
+  }
+
+  @Override
+  public boolean checkDNSpaceForScheduling(DatanodeInfo dn, StorageType type,
+      long estimatedSize) {
+    initializeNamenodeConnector();
+    try {
+      return nnc.getNNProtocolConnection().checkDNSpaceForScheduling(dn, type,
+          estimatedSize);
+    } catch (IOException e) {
+      LOG.warn("Verify the given datanode:{} is good and has "
+          + "estimated space in it.", dn, e);
+      return false;
+    }
+  }
+
+  @Override
+  public Long getNextSPSPathId() {
+    initializeNamenodeConnector();
+    try {
+      return nnc.getNNProtocolConnection().getNextSPSPathId();
+    } catch (IOException e) {
+      LOG.warn("Exception while getting next sps path id from Namenode.", e);
+      return null;
+    }
+  }
+
+  @Override
+  public void removeSPSPathId(long pathId) {
+    // We need not specifically implement for external.
+  }
+
+  @Override
+  public void removeAllSPSPathIds() {
+    // We need not specifically implement for external.
+  }
+
+  @Override
+  public String getFilePath(Long inodeId) {
+    try {
+      return nnc.getNNProtocolConnection().getFilePath(inodeId);
+    } catch (IOException e) {
+      LOG.warn("Exception while getting file path id:{} from Namenode.",
+          inodeId, e);
+      return null;
+    }
+  }
+
+  @Override
+  public void close() throws IOException {
+    synchronized (nnConnectionLock) {
+      if (nnc != null) {
+        nnc.close();
+      }
+    }
+  }
+
+  private void initializeNamenodeConnector() {
+    synchronized (nnConnectionLock) {
+      if (nnc == null) {
+        try {
+          nnc = getNameNodeConnector(service.getConf());
+        } catch (IOException e) {
+          LOG.warn("Exception while creating Namenode Connector.."
+              + "Namenode might not have started.", e);
+        }
+      }
+    }
+  }
+
+  public static NameNodeConnector getNameNodeConnector(Configuration conf)
+      throws IOException {
+    final Collection<URI> namenodes = DFSUtil.getInternalNsRpcUris(conf);
+    List<NameNodeConnector> nncs = Collections.emptyList();
+    NameNodeConnector.checkOtherInstanceRunning(false);
+    nncs = NameNodeConnector.newNameNodeConnectors(namenodes,
+        ExternalSPSContext.class.getSimpleName(),
+        HdfsServerConstants.MOVER_ID_PATH, conf,
+        NameNodeConnector.DEFAULT_MAX_IDLE_ITERATIONS);
+    return nncs.get(0);
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/16820957/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/NamenodeProtocol.proto
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/NamenodeProtocol.proto 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/NamenodeProtocol.proto
index 683dc80..b0e900d 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/NamenodeProtocol.proto
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/NamenodeProtocol.proto
@@ -206,6 +206,39 @@ message IsRollingUpgradeResponseProto {
   required bool isRollingUpgrade = 1;
 }
 
+message GetFilePathRequestProto {
+  required uint64 fileId = 1;
+}
+
+message GetFilePathResponseProto {
+  required string srcPath = 1;
+}
+
+message GetNextSPSPathIdRequestProto {
+}
+
+message GetNextSPSPathIdResponseProto {
+  optional uint64 fileId = 1;
+}
+
+message CheckDNSpaceRequestProto {
+  required DatanodeInfoProto dnInfo = 1;
+  required StorageTypeProto storageType = 2;
+  required uint64 estimatedSize = 3;
+}
+
+message CheckDNSpaceResponseProto {
+  required bool isGoodDatanodeWithSpace = 1;
+}
+
+message HasLowRedundancyBlocksRequestProto {
+  required uint64 inodeId = 1;
+}
+
+message HasLowRedundancyBlocksResponseProto {
+  required bool hasLowRedundancyBlocks = 1;
+}
+
 /**
  * Protocol used by the sub-ordinate namenode to send requests
  * the active/primary namenode.
@@ -287,4 +320,28 @@ service NamenodeProtocolService {
    */
   rpc isRollingUpgrade(IsRollingUpgradeRequestProto)
       returns (IsRollingUpgradeResponseProto);
+
+  /**
+   * Return the corresponding file path for give file id
+   */
+  rpc getFilePath(GetFilePathRequestProto)
+      returns (GetFilePathResponseProto);
+
+  /**
+   * Return the sps path id from namenode
+   */
+  rpc getNextSPSPathId(GetNextSPSPathIdRequestProto)
+      returns (GetNextSPSPathIdResponseProto);
+
+  /**
+   * Return the sps path id from namenode
+   */
+  rpc checkDNSpaceForScheduling(CheckDNSpaceRequestProto)
+      returns (CheckDNSpaceResponseProto);
+
+  /**
+   * check whether given file id has low redundancy blocks.
+   */
+  rpc hasLowRedundancyBlocks(HasLowRedundancyBlocksRequestProto)
+      returns (HasLowRedundancyBlocksResponseProto);
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/16820957/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/sps/TestExternalStoragePolicySatisfier.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/sps/TestExternalStoragePolicySatisfier.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/sps/TestExternalStoragePolicySatisfier.java
index 42b04da..fe08b8f 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/sps/TestExternalStoragePolicySatisfier.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/sps/TestExternalStoragePolicySatisfier.java
@@ -37,7 +37,6 @@ import 
org.apache.hadoop.hdfs.server.blockmanagement.BlockManager;
 import org.apache.hadoop.hdfs.server.namenode.sps.BlockMovementListener;
 import org.apache.hadoop.hdfs.server.namenode.sps.Context;
 import org.apache.hadoop.hdfs.server.namenode.sps.FileIdCollector;
-import org.apache.hadoop.hdfs.server.namenode.sps.IntraSPSNameNodeContext;
 import org.apache.hadoop.hdfs.server.namenode.sps.SPSService;
 import org.apache.hadoop.hdfs.server.namenode.sps.StoragePolicySatisfier;
 import org.apache.hadoop.hdfs.server.namenode.sps.TestStoragePolicySatisfier;
@@ -96,14 +95,8 @@ public class TestExternalStoragePolicySatisfier
     SPSService spsService = blkMgr.getSPSService();
     spsService.stopGracefully();
 
-    // TODO: Since External is not fully implemented, just used INTERNAL now.
-    // Need to set External context here.
-    IntraSPSNameNodeContext context = new IntraSPSNameNodeContext(
-        cluster.getNameNode().getNamesystem(), blkMgr, blkMgr.getSPSService()) 
{
-      public boolean isRunning() {
-        return true;
-      };
-    };
+    ExternalSPSContext context = new ExternalSPSContext(spsService);
+
     ExternalBlockMovementListener blkMoveListener =
         new ExternalBlockMovementListener();
     ExternalSPSBlockMoveTaskHandler externalHandler =
@@ -131,15 +124,7 @@ public class TestExternalStoragePolicySatisfier
     spsService = blkMgr.getSPSService();
     spsService.stopGracefully();
 
-    // TODO: Since External is not fully implemented, just used INTERNAL now.
-    // Need to set External context here.
-    IntraSPSNameNodeContext context = new IntraSPSNameNodeContext(
-        getCluster().getNameNode().getNamesystem(), blkMgr,
-        blkMgr.getSPSService()) {
-      public boolean isRunning() {
-        return true;
-      };
-    };
+    ExternalSPSContext context = new ExternalSPSContext(spsService);
     ExternalBlockMovementListener blkMoveListener =
         new ExternalBlockMovementListener();
     ExternalSPSBlockMoveTaskHandler externalHandler =
@@ -180,7 +165,7 @@ public class TestExternalStoragePolicySatisfier
     for (URI nn : namenodes) {
       nnMap.put(nn, null);
     }
-    final Path externalSPSPathId = new Path("/system/externalSPS.id");
+    final Path externalSPSPathId = new Path("/system/tmp.id");
     final List<NameNodeConnector> nncs = NameNodeConnector
         .newNameNodeConnectors(nnMap,
             StoragePolicySatisfier.class.getSimpleName(), externalSPSPathId,
@@ -205,6 +190,14 @@ public class TestExternalStoragePolicySatisfier
   }
 
   /**
+   * This test case is more specific to internal.
+   */
+  @Ignore("This test is specific to internal, so skipping here.")
+  public void testWhenMoverIsAlreadyRunningBeforeStoragePolicySatisfier()
+      throws Exception {
+  }
+
+  /**
    * Status won't be supported for external SPS, now. So, ignoring it.
    */
   @Ignore("Status is not supported for external SPS. So, ignoring it.")


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org
For additional commands, e-mail: common-commits-h...@hadoop.apache.org

Reply via email to