HDFS-13077. [SPS]: Fix review comments of external storage policy satisfier. 
Contributed by Rakesh R.


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/adc8d7a7
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/adc8d7a7
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/adc8d7a7

Branch: refs/heads/HDFS-10285
Commit: adc8d7a74ba6a9d109571c880fb1d958eeb0224a
Parents: d690c0b
Author: Surendra Singh Lilhore <surendralilh...@apache.org>
Authored: Mon Jan 29 23:59:55 2018 +0530
Committer: Uma Maheswara Rao Gangumalla <umamah...@apache.org>
Committed: Thu Aug 9 20:47:23 2018 -0700

----------------------------------------------------------------------
 .../org/apache/hadoop/hdfs/DFSConfigKeys.java   |  14 +-
 .../server/blockmanagement/BlockManager.java    |  33 +++-
 .../namenode/FSDirSatisfyStoragePolicyOp.java   |  15 ++
 .../hdfs/server/namenode/FSNamesystem.java      |  41 ++--
 .../hdfs/server/namenode/NameNodeRpcServer.java |  11 ++
 .../hdfs/server/namenode/sps/SPSPathIds.java    |   8 +-
 .../namenode/sps/StoragePolicySatisfier.java    |   6 +-
 .../hdfs/server/sps/ExternalSPSContext.java     |   4 +
 .../sps/ExternalStoragePolicySatisfier.java     |  30 ++-
 .../sps/TestStoragePolicySatisfier.java         |   7 +-
 .../sps/TestExternalStoragePolicySatisfier.java | 195 ++++++++++++++++++-
 11 files changed, 323 insertions(+), 41 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/adc8d7a7/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
index bf29d14..b354d64 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
@@ -614,7 +614,7 @@ public class DFSConfigKeys extends CommonConfigurationKeys {
   public static final String  DFS_MOVER_MAX_NO_MOVE_INTERVAL_KEY = 
"dfs.mover.max-no-move-interval";
   public static final int    DFS_MOVER_MAX_NO_MOVE_INTERVAL_DEFAULT = 60*1000; 
// One minute
 
-  // SPS related configurations
+  // StoragePolicySatisfier (SPS) related configurations
   public static final String  DFS_STORAGE_POLICY_SATISFIER_MODE_KEY =
       "dfs.storage.policy.satisfier.mode";
   public static final String DFS_STORAGE_POLICY_SATISFIER_MODE_DEFAULT =
@@ -643,6 +643,18 @@ public class DFSConfigKeys extends CommonConfigurationKeys 
{
       "dfs.storage.policy.satisfier.low.max-streams.preference";
   public static final boolean 
DFS_STORAGE_POLICY_SATISFIER_LOW_MAX_STREAMS_PREFERENCE_DEFAULT =
       true;
+  public static final String DFS_SPS_MAX_OUTSTANDING_PATHS_KEY =
+      "dfs.storage.policy.satisfier.max.outstanding.paths";
+  public static final int DFS_SPS_MAX_OUTSTANDING_PATHS_DEFAULT = 10000;
+
+  // SPS keytab configurations, by default it is disabled.
+  public static final String  DFS_SPS_ADDRESS_KEY =
+      "dfs.storage.policy.satisfier.address";
+  public static final String  DFS_SPS_ADDRESS_DEFAULT= "0.0.0.0:0";
+  public static final String  DFS_SPS_KEYTAB_FILE_KEY =
+      "dfs.storage.policy.satisfier.keytab.file";
+  public static final String  DFS_SPS_KERBEROS_PRINCIPAL_KEY =
+      "dfs.storage.policy.satisfier.kerberos.principal";
 
   public static final String  DFS_DATANODE_ADDRESS_KEY = 
"dfs.datanode.address";
   public static final int     DFS_DATANODE_DEFAULT_PORT = 9866;

http://git-wip-us.apache.org/repos/asf/hadoop/blob/adc8d7a7/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java
index 4ea64a3..9205910 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java
@@ -439,6 +439,7 @@ public class BlockManager implements BlockStatsMXBean {
   private final boolean storagePolicyEnabled;
   private StoragePolicySatisfierMode spsMode;
   private SPSPathIds spsPaths;
+  private final int spsOutstandingPathsLimit;
 
   /** Minimum live replicas needed for the datanode to be transitioned
    * from ENTERING_MAINTENANCE to IN_MAINTENANCE.
@@ -478,14 +479,16 @@ public class BlockManager implements BlockStatsMXBean {
         DFSConfigKeys.DFS_NAMENODE_RECONSTRUCTION_PENDING_TIMEOUT_SEC_KEY,
         DFSConfigKeys.DFS_NAMENODE_RECONSTRUCTION_PENDING_TIMEOUT_SEC_DEFAULT)
         * 1000L);
-
+    // StoragePolicySatisfier(SPS) configs
     storagePolicyEnabled =
         conf.getBoolean(DFSConfigKeys.DFS_STORAGE_POLICY_ENABLED_KEY,
             DFSConfigKeys.DFS_STORAGE_POLICY_ENABLED_DEFAULT);
-    String spsModeVal =
-        conf.get(
-            DFSConfigKeys.DFS_STORAGE_POLICY_SATISFIER_MODE_KEY,
-            DFSConfigKeys.DFS_STORAGE_POLICY_SATISFIER_MODE_DEFAULT);
+    String spsModeVal = conf.get(
+        DFSConfigKeys.DFS_STORAGE_POLICY_SATISFIER_MODE_KEY,
+        DFSConfigKeys.DFS_STORAGE_POLICY_SATISFIER_MODE_DEFAULT);
+    spsOutstandingPathsLimit = conf.getInt(
+        DFSConfigKeys.DFS_SPS_MAX_OUTSTANDING_PATHS_KEY,
+        DFSConfigKeys.DFS_SPS_MAX_OUTSTANDING_PATHS_DEFAULT);
     spsMode = StoragePolicySatisfierMode.fromString(spsModeVal);
     spsPaths = new SPSPathIds();
     sps = new StoragePolicySatisfier(conf);
@@ -5188,6 +5191,12 @@ public class BlockManager implements BlockStatsMXBean {
    */
   public StoragePolicySatisfyPathStatus checkStoragePolicySatisfyPathStatus(
       String path) throws IOException {
+    if (spsMode != StoragePolicySatisfierMode.INTERNAL) {
+      LOG.debug("Satisfier is not running inside namenode, so status "
+          + "can't be returned.");
+      throw new IOException("Satisfier is not running inside namenode, "
+          + "so status can't be returned.");
+    }
     return sps.checkStoragePolicySatisfyPathStatus(path);
   }
 
@@ -5207,6 +5216,20 @@ public class BlockManager implements BlockStatsMXBean {
   }
 
   /**
+   * Verify that satisfier queue limit exceeds allowed outstanding limit.
+   */
+  public void verifyOutstandingSPSPathQLimit() throws IOException {
+    long size = spsPaths.size();
+    // Checking that the SPS call Q exceeds the allowed limit.
+    if (spsOutstandingPathsLimit - size <= 0) {
+      LOG.debug("Satisifer Q - outstanding limit:{}, current size:{}",
+          spsOutstandingPathsLimit, size);
+      throw new IOException("Outstanding satisfier queue limit: "
+          + spsOutstandingPathsLimit + " exceeded, try later!");
+    }
+  }
+
+  /**
    * Removes the SPS path id from the list of sps paths.
    */
   public void removeSPSPathId(long trackId) {

http://git-wip-us.apache.org/repos/asf/hadoop/blob/adc8d7a7/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirSatisfyStoragePolicyOp.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirSatisfyStoragePolicyOp.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirSatisfyStoragePolicyOp.java
index eed6e52..5ffd6e8 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirSatisfyStoragePolicyOp.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirSatisfyStoragePolicyOp.java
@@ -45,6 +45,21 @@ final class FSDirSatisfyStoragePolicyOp {
   private FSDirSatisfyStoragePolicyOp() {
   }
 
+  /**
+   * Satisfy storage policy function which will add the entry to SPS call queue
+   * and will perform satisfaction async way.
+   *
+   * @param fsd
+   *          fs directory
+   * @param bm
+   *          block manager
+   * @param src
+   *          source path
+   * @param logRetryCache
+   *          whether to record RPC ids in editlog for retry cache rebuilding
+   * @return file status info
+   * @throws IOException
+   */
   static FileStatus satisfyStoragePolicy(FSDirectory fsd, BlockManager bm,
       String src, boolean logRetryCache) throws IOException {
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/adc8d7a7/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
index c93791b..5dfec25 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
@@ -2253,28 +2253,12 @@ public class FSNamesystem implements Namesystem, 
FSNamesystemMBean,
       throws IOException {
     final String operationName = "satisfyStoragePolicy";
     FileStatus auditStat;
+    validateStoragePolicySatisfy();
     checkOperation(OperationCategory.WRITE);
     writeLock();
     try {
       checkOperation(OperationCategory.WRITE);
       checkNameNodeSafeMode("Cannot satisfy storage policy for " + src);
-      // make sure storage policy is enabled, otherwise
-      // there is no need to satisfy storage policy.
-      if (!dir.isStoragePolicyEnabled()) {
-        throw new IOException(String.format(
-            "Failed to satisfy storage policy since %s is set to false.",
-            DFS_STORAGE_POLICY_ENABLED_KEY));
-      }
-
-      if (!blockManager.isSPSEnabled()
-          || (blockManager.getSPSMode() == StoragePolicySatisfierMode.INTERNAL
-              && !blockManager.getStoragePolicySatisfier().isRunning())) {
-        throw new UnsupportedActionException(
-            "Cannot request to satisfy storage policy "
-                + "when storage policy satisfier feature has been disabled"
-                + " by admin. Seek for an admin help to enable it "
-                + "or use Mover tool.");
-      }
       auditStat = FSDirSatisfyStoragePolicyOp.satisfyStoragePolicy(
           dir, blockManager, src, logRetryCache);
     } catch (AccessControlException e) {
@@ -2287,6 +2271,29 @@ public class FSNamesystem implements Namesystem, 
FSNamesystemMBean,
     logAuditEvent(true, operationName, src, null, auditStat);
   }
 
+  private void validateStoragePolicySatisfy()
+      throws UnsupportedActionException, IOException {
+    // make sure storage policy is enabled, otherwise
+    // there is no need to satisfy storage policy.
+    if (!dir.isStoragePolicyEnabled()) {
+      throw new IOException(String.format(
+          "Failed to satisfy storage policy since %s is set to false.",
+          DFS_STORAGE_POLICY_ENABLED_KEY));
+    }
+    // checks sps status
+    if (!blockManager.isSPSEnabled()
+        || (blockManager.getSPSMode() == StoragePolicySatisfierMode.INTERNAL
+            && !blockManager.getStoragePolicySatisfier().isRunning())) {
+      throw new UnsupportedActionException(
+          "Cannot request to satisfy storage policy "
+              + "when storage policy satisfier feature has been disabled"
+              + " by admin. Seek for an admin help to enable it "
+              + "or use Mover tool.");
+    }
+    // checks SPS Q has many outstanding requests.
+    blockManager.verifyOutstandingSPSPathQLimit();
+  }
+
   /**
    * unset storage policy set for a given file or a directory.
    *

http://git-wip-us.apache.org/repos/asf/hadoop/blob/adc8d7a7/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java
index 0e50965..d74dc9e 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java
@@ -110,6 +110,7 @@ import 
org.apache.hadoop.hdfs.protocol.HdfsConstants.DatanodeReportType;
 import org.apache.hadoop.hdfs.protocol.HdfsConstants.ReencryptAction;
 import org.apache.hadoop.hdfs.protocol.HdfsConstants.RollingUpgradeAction;
 import org.apache.hadoop.hdfs.protocol.HdfsConstants.SafeModeAction;
+import 
org.apache.hadoop.hdfs.protocol.HdfsConstants.StoragePolicySatisfierMode;
 import 
org.apache.hadoop.hdfs.protocol.HdfsConstants.StoragePolicySatisfyPathStatus;
 import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
 import org.apache.hadoop.hdfs.protocol.LocatedBlock;
@@ -2578,6 +2579,16 @@ public class NameNodeRpcServer implements 
NamenodeProtocols {
     if (nn.isStandbyState()) {
       throw new StandbyException("Not supported by Standby Namenode.");
     }
+    // Check that internal SPS service is running
+    if (namesystem.getBlockManager()
+        .getSPSMode() == StoragePolicySatisfierMode.INTERNAL
+        && namesystem.getBlockManager().getSPSService().isRunning()) {
+      LOG.debug("SPS service is internally enabled and running inside "
+          + "namenode, so external SPS is not allowed to fetch the path Ids");
+      throw new IOException("SPS service is internally enabled and running"
+          + " inside namenode, so external SPS is not allowed to fetch"
+          + " the path Ids");
+    }
     return namesystem.getBlockManager().getNextSPSPathId();
   }
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/adc8d7a7/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/SPSPathIds.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/SPSPathIds.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/SPSPathIds.java
index e0f4999..6c0f8b2 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/SPSPathIds.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/SPSPathIds.java
@@ -31,7 +31,6 @@ import org.apache.hadoop.classification.InterfaceStability;
 public class SPSPathIds {
 
   // List of pending dir to satisfy the policy
-  // TODO: Make this bounded queue.
   private final Queue<Long> spsDirsToBeTraveresed = new LinkedList<Long>();
 
   /**
@@ -61,4 +60,11 @@ public class SPSPathIds {
   public synchronized Long pollNext() {
     return spsDirsToBeTraveresed.poll();
   }
+
+  /**
+   * @return the size of the queue.
+   */
+  public synchronized long size() {
+    return spsDirsToBeTraveresed.size();
+  }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/adc8d7a7/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/StoragePolicySatisfier.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/StoragePolicySatisfier.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/StoragePolicySatisfier.java
index 4ddfe2e..87faced 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/StoragePolicySatisfier.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/StoragePolicySatisfier.java
@@ -91,7 +91,7 @@ public class StoragePolicySatisfier implements SPSService, 
Runnable {
   private int blockMovementMaxRetry;
   private Context ctxt;
   private BlockMoveTaskHandler blockMoveTaskHandler;
-  private Configuration conf;
+  private final Configuration conf;
 
   public StoragePolicySatisfier(Configuration conf) {
     this.conf = conf;
@@ -441,8 +441,8 @@ public class StoragePolicySatisfier implements SPSService, 
Runnable {
             liveDns, ecPolicy);
         if (blocksPaired) {
           status = BlocksMovingAnalysis.Status.BLOCKS_TARGETS_PAIRED;
-        } else
-          if (status != BlocksMovingAnalysis.Status.BLOCKS_TARGETS_PAIRED) {
+        } else if (status !=
+            BlocksMovingAnalysis.Status.BLOCKS_TARGETS_PAIRED) {
           // Check if the previous block was successfully paired. Here the
           // status will set to NO_BLOCKS_TARGETS_PAIRED only when none of the
           // blocks of a file found its eligible targets to satisfy the storage

http://git-wip-us.apache.org/repos/asf/hadoop/blob/adc8d7a7/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/sps/ExternalSPSContext.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/sps/ExternalSPSContext.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/sps/ExternalSPSContext.java
index e3b3bbb..c309209 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/sps/ExternalSPSContext.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/sps/ExternalSPSContext.java
@@ -175,6 +175,10 @@ public class ExternalSPSContext implements Context {
   @Override
   public boolean checkDNSpaceForScheduling(DatanodeInfo dn, StorageType type,
       long estimatedSize) {
+    // TODO: Instead of calling namenode for checking the available space, it
+    // can be optimized by maintaining local cache of datanode storage report
+    // and do the computations. This local cache can be refreshed per file or
+    // periodic fashion.
     try {
       return nnc.getNNProtocolConnection().checkDNSpaceForScheduling(dn, type,
           estimatedSize);

http://git-wip-us.apache.org/repos/asf/hadoop/blob/adc8d7a7/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/sps/ExternalStoragePolicySatisfier.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/sps/ExternalStoragePolicySatisfier.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/sps/ExternalStoragePolicySatisfier.java
index c64abc3..59935b6 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/sps/ExternalStoragePolicySatisfier.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/sps/ExternalStoragePolicySatisfier.java
@@ -20,6 +20,7 @@ package org.apache.hadoop.hdfs.server.sps;
 import static org.apache.hadoop.util.ExitUtil.terminate;
 
 import java.io.IOException;
+import java.net.InetSocketAddress;
 import java.net.URI;
 import java.util.ArrayList;
 import java.util.Collection;
@@ -28,6 +29,7 @@ import java.util.List;
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hdfs.DFSConfigKeys;
 import org.apache.hadoop.hdfs.DFSUtil;
 import org.apache.hadoop.hdfs.HdfsConfiguration;
 import org.apache.hadoop.hdfs.protocol.Block;
@@ -36,6 +38,9 @@ import 
org.apache.hadoop.hdfs.server.balancer.NameNodeConnector;
 import org.apache.hadoop.hdfs.server.common.HdfsServerConstants;
 import org.apache.hadoop.hdfs.server.namenode.sps.BlockMovementListener;
 import org.apache.hadoop.hdfs.server.namenode.sps.StoragePolicySatisfier;
+import org.apache.hadoop.net.NetUtils;
+import org.apache.hadoop.security.SecurityUtil;
+import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.util.StringUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -44,20 +49,25 @@ import org.slf4j.LoggerFactory;
  * This class starts and runs external SPS service.
  */
 @InterfaceAudience.Private
-public class ExternalStoragePolicySatisfier {
+public final class ExternalStoragePolicySatisfier {
   public static final Logger LOG = LoggerFactory
       .getLogger(ExternalStoragePolicySatisfier.class);
 
+  private ExternalStoragePolicySatisfier() {
+    // This is just a class to start and run external sps.
+  }
+
   /**
    * Main method to start SPS service.
    */
-  public static void main(String args[]) throws Exception {
+  public static void main(String[] args) throws Exception {
     NameNodeConnector nnc = null;
     try {
       StringUtils.startupShutdownMessage(StoragePolicySatisfier.class, args,
           LOG);
       HdfsConfiguration spsConf = new HdfsConfiguration();
-      //TODO : login with SPS keytab
+      // login with SPS keytab
+      secureLogin(spsConf);
       StoragePolicySatisfier sps = new StoragePolicySatisfier(spsConf);
       nnc = getNameNodeConnector(spsConf);
 
@@ -92,6 +102,18 @@ public class ExternalStoragePolicySatisfier {
     }
   }
 
+  private static void secureLogin(Configuration conf)
+      throws IOException {
+    UserGroupInformation.setConfiguration(conf);
+    String addr = conf.get(DFSConfigKeys.DFS_SPS_ADDRESS_KEY,
+        DFSConfigKeys.DFS_SPS_ADDRESS_DEFAULT);
+    InetSocketAddress socAddr = NetUtils.createSocketAddr(addr, 0,
+        DFSConfigKeys.DFS_SPS_ADDRESS_KEY);
+    SecurityUtil.login(conf, DFSConfigKeys.DFS_SPS_KEYTAB_FILE_KEY,
+        DFSConfigKeys.DFS_SPS_KERBEROS_PRINCIPAL_KEY,
+        socAddr.getHostName());
+  }
+
   private static NameNodeConnector getNameNodeConnector(Configuration conf)
       throws IOException, InterruptedException {
     final Collection<URI> namenodes = DFSUtil.getInternalNsRpcUris(conf);
@@ -100,7 +122,7 @@ public class ExternalStoragePolicySatisfier {
       try {
         final List<NameNodeConnector> nncs = NameNodeConnector
             .newNameNodeConnectors(namenodes,
-                StoragePolicySatisfier.class.getSimpleName(),
+                ExternalStoragePolicySatisfier.class.getSimpleName(),
                 externalSPSPathId, conf,
                 NameNodeConnector.DEFAULT_MAX_IDLE_ITERATIONS);
         return nncs.get(0);

http://git-wip-us.apache.org/repos/asf/hadoop/blob/adc8d7a7/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/sps/TestStoragePolicySatisfier.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/sps/TestStoragePolicySatisfier.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/sps/TestStoragePolicySatisfier.java
index 135d996..9e0a39f 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/sps/TestStoragePolicySatisfier.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/sps/TestStoragePolicySatisfier.java
@@ -134,8 +134,9 @@ public class TestStoragePolicySatisfier {
    *
    * @throws IOException
    */
-  public void getFS() throws IOException {
+  public DistributedFileSystem getFS() throws IOException {
     this.dfs = hdfsCluster.getFileSystem();
+    return this.dfs;
   }
 
   @After
@@ -423,9 +424,9 @@ public class TestStoragePolicySatisfier {
                 + "for %s since %s is set to false.",
             FILE, DFS_STORAGE_POLICY_ENABLED_KEY));
       } catch (IOException e) {
-        Assert.assertTrue(e.getMessage().contains(String.format(
+        GenericTestUtils.assertExceptionContains(String.format(
             "Failed to satisfy storage policy since %s is set to false.",
-            DFS_STORAGE_POLICY_ENABLED_KEY)));
+            DFS_STORAGE_POLICY_ENABLED_KEY), e);
       }
 
       hdfsCluster.getConfiguration(0).

http://git-wip-us.apache.org/repos/asf/hadoop/blob/adc8d7a7/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/sps/TestExternalStoragePolicySatisfier.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/sps/TestExternalStoragePolicySatisfier.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/sps/TestExternalStoragePolicySatisfier.java
index febc2ea..15a4271 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/sps/TestExternalStoragePolicySatisfier.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/sps/TestExternalStoragePolicySatisfier.java
@@ -17,17 +17,40 @@
  */
 package org.apache.hadoop.hdfs.server.sps;
 
+import static 
org.apache.hadoop.fs.CommonConfigurationKeys.IPC_CLIENT_CONNECT_MAX_RETRIES_ON_SASL_KEY;
+import static 
org.apache.hadoop.hdfs.DFSConfigKeys.DFS_BLOCK_ACCESS_TOKEN_ENABLE_KEY;
+import static 
org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_HTTPS_KEYSTORE_RESOURCE_KEY;
+import static 
org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_HTTPS_ADDRESS_KEY;
+import static 
org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_KERBEROS_PRINCIPAL_KEY;
+import static 
org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_KEYTAB_FILE_KEY;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_HTTP_POLICY_KEY;
+import static 
org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_HTTPS_ADDRESS_KEY;
+import static 
org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_KERBEROS_PRINCIPAL_KEY;
+import static 
org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_KEYTAB_FILE_KEY;
+import static 
org.apache.hadoop.hdfs.DFSConfigKeys.DFS_SERVER_HTTPS_KEYSTORE_RESOURCE_KEY;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_SPS_ADDRESS_KEY;
+import static 
org.apache.hadoop.hdfs.DFSConfigKeys.DFS_SPS_KERBEROS_PRINCIPAL_KEY;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_SPS_KEYTAB_FILE_KEY;
+import static 
org.apache.hadoop.hdfs.DFSConfigKeys.DFS_SPS_MAX_OUTSTANDING_PATHS_KEY;
+import static 
org.apache.hadoop.hdfs.DFSConfigKeys.DFS_WEB_AUTHENTICATION_KERBEROS_PRINCIPAL_KEY;
+import static 
org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_DATA_TRANSFER_PROTECTION_KEY;
+
+import java.io.File;
 import java.io.IOException;
 import java.net.URI;
+import java.security.PrivilegedExceptionAction;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.List;
+import java.util.Properties;
 
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileUtil;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.StorageType;
 import org.apache.hadoop.hdfs.DFSConfigKeys;
 import org.apache.hadoop.hdfs.DFSUtil;
+import org.apache.hadoop.hdfs.DistributedFileSystem;
 import org.apache.hadoop.hdfs.MiniDFSCluster;
 import org.apache.hadoop.hdfs.protocol.Block;
 import 
org.apache.hadoop.hdfs.protocol.HdfsConstants.StoragePolicySatisfierMode;
@@ -39,8 +62,17 @@ import 
org.apache.hadoop.hdfs.server.namenode.sps.FileIdCollector;
 import org.apache.hadoop.hdfs.server.namenode.sps.SPSService;
 import org.apache.hadoop.hdfs.server.namenode.sps.StoragePolicySatisfier;
 import org.apache.hadoop.hdfs.server.namenode.sps.TestStoragePolicySatisfier;
+import org.apache.hadoop.http.HttpConfig;
+import org.apache.hadoop.minikdc.MiniKdc;
+import org.apache.hadoop.security.SecurityUtil;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.security.authentication.util.KerberosName;
+import org.apache.hadoop.security.ssl.KeyStoreTestUtil;
+import org.apache.hadoop.test.GenericTestUtils;
+import org.junit.After;
 import org.junit.Assert;
 import org.junit.Ignore;
+import org.junit.Test;
 
 /**
  * Tests the external sps service plugins.
@@ -52,6 +84,18 @@ public class TestExternalStoragePolicySatisfier
           {StorageType.DISK, StorageType.DISK},
           {StorageType.DISK, StorageType.DISK}};
   private NameNodeConnector nnc;
+  private File keytabFile;
+  private String principal;
+  private MiniKdc kdc;
+  private File baseDir;
+
+  @After
+  public void destroy() throws Exception {
+    if (kdc != null) {
+      kdc.stop();
+      FileUtil.fullyDelete(baseDir);
+    }
+  }
 
   @Override
   public void setUp() {
@@ -178,20 +222,157 @@ public class TestExternalStoragePolicySatisfier
     }
   }
 
+  private void initSecureConf(Configuration conf) throws Exception {
+    String username = "externalSPS";
+    baseDir = GenericTestUtils
+        .getTestDir(TestExternalStoragePolicySatisfier.class.getSimpleName());
+    FileUtil.fullyDelete(baseDir);
+    Assert.assertTrue(baseDir.mkdirs());
+
+    Properties kdcConf = MiniKdc.createConf();
+    kdc = new MiniKdc(kdcConf, baseDir);
+    kdc.start();
+
+    SecurityUtil.setAuthenticationMethod(
+        UserGroupInformation.AuthenticationMethod.KERBEROS, conf);
+    UserGroupInformation.setConfiguration(conf);
+    KerberosName.resetDefaultRealm();
+    Assert.assertTrue("Expected configuration to enable security",
+        UserGroupInformation.isSecurityEnabled());
+
+    keytabFile = new File(baseDir, username + ".keytab");
+    String keytab = keytabFile.getAbsolutePath();
+    // Windows will not reverse name lookup "127.0.0.1" to "localhost".
+    String krbInstance = Path.WINDOWS ? "127.0.0.1" : "localhost";
+    principal = username + "/" + krbInstance + "@" + kdc.getRealm();
+    String spnegoPrincipal = "HTTP/" + krbInstance + "@" + kdc.getRealm();
+    kdc.createPrincipal(keytabFile, username, username + "/" + krbInstance,
+        "HTTP/" + krbInstance);
+
+    conf.set(DFS_NAMENODE_KERBEROS_PRINCIPAL_KEY, principal);
+    conf.set(DFS_NAMENODE_KEYTAB_FILE_KEY, keytab);
+    conf.set(DFS_DATANODE_KERBEROS_PRINCIPAL_KEY, principal);
+    conf.set(DFS_DATANODE_KEYTAB_FILE_KEY, keytab);
+    conf.set(DFS_WEB_AUTHENTICATION_KERBEROS_PRINCIPAL_KEY, spnegoPrincipal);
+    conf.setBoolean(DFS_BLOCK_ACCESS_TOKEN_ENABLE_KEY, true);
+    conf.set(DFS_DATA_TRANSFER_PROTECTION_KEY, "authentication");
+    conf.set(DFS_HTTP_POLICY_KEY, HttpConfig.Policy.HTTPS_ONLY.name());
+    conf.set(DFS_NAMENODE_HTTPS_ADDRESS_KEY, "localhost:0");
+    conf.set(DFS_DATANODE_HTTPS_ADDRESS_KEY, "localhost:0");
+    conf.setInt(IPC_CLIENT_CONNECT_MAX_RETRIES_ON_SASL_KEY, 10);
+
+    conf.set(DFS_SPS_ADDRESS_KEY, "localhost:0");
+    conf.set(DFS_SPS_KEYTAB_FILE_KEY, keytab);
+    conf.set(DFS_SPS_KERBEROS_PRINCIPAL_KEY, principal);
+
+    String keystoresDir = baseDir.getAbsolutePath();
+    String sslConfDir = KeyStoreTestUtil
+        .getClasspathDir(TestExternalStoragePolicySatisfier.class);
+    KeyStoreTestUtil.setupSSLConfig(keystoresDir, sslConfDir, conf, false);
+
+    conf.set(DFS_CLIENT_HTTPS_KEYSTORE_RESOURCE_KEY,
+        KeyStoreTestUtil.getClientSSLConfigFileName());
+    conf.set(DFS_SERVER_HTTPS_KEYSTORE_RESOURCE_KEY,
+        KeyStoreTestUtil.getServerSSLConfigFileName());
+
+    conf.setInt(DFSConfigKeys.DFS_BYTES_PER_CHECKSUM_KEY, DEFAULT_BLOCK_SIZE);
+    conf.setLong(DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_KEY, 1L);
+  }
+
   /**
-   * This test need not run as external scan is not a batch based scanning 
right
-   * now.
+   * Test SPS runs fine when logging in with a keytab in kerberized env. 
Reusing
+   * testWhenStoragePolicySetToALLSSD here for basic functionality testing.
    */
-  @Ignore("ExternalFileIdCollector is not batch based right now."
-      + " So, ignoring it.")
-  public void testBatchProcessingForSPSDirectory() throws Exception {
+  @Test(timeout = 300000)
+  public void testWithKeytabs() throws Exception {
+    try {
+      initSecureConf(getConf());
+      final UserGroupInformation ugi = UserGroupInformation
+          .loginUserFromKeytabAndReturnUGI(principal,
+              keytabFile.getAbsolutePath());
+      ugi.doAs(new PrivilegedExceptionAction<Void>() {
+        @Override
+        public Void run() throws Exception {
+          // verify that sps runs Ok.
+          testWhenStoragePolicySetToALLSSD();
+          // verify that UGI was logged in using keytab.
+          Assert.assertTrue(UserGroupInformation.isLoginKeytabBased());
+          return null;
+        }
+      });
+    } finally {
+      // Reset UGI so that other tests are not affected.
+      UserGroupInformation.reset();
+      UserGroupInformation.setConfiguration(new Configuration());
+    }
   }
 
   /**
-   * Status won't be supported for external SPS, now. So, ignoring it.
+   * Test verifies that SPS call will throw exception if the call Q exceeds
+   * OutstandingQueueLimit value.
+   *
+   * @throws Exception
    */
-  @Ignore("Status is not supported for external SPS. So, ignoring it.")
+  @Test(timeout = 300000)
+  public void testOutstandingQueueLimitExceeds() throws Exception {
+    try {
+      getConf().setInt(DFS_SPS_MAX_OUTSTANDING_PATHS_KEY, 3);
+      createCluster();
+      List<String> files = new ArrayList<>();
+      files.add(FILE);
+      DistributedFileSystem fs = getFS();
+      BlockManager blkMgr = getCluster().getNameNode().getNamesystem()
+          .getBlockManager();
+      SPSService spsService = blkMgr.getSPSService();
+      spsService.stopGracefully(); // stops SPS
+
+      // Creates 4 more files. Send all of them for satisfying the storage
+      // policy together.
+      for (int i = 0; i < 3; i++) {
+        String file1 = "/testOutstandingQueueLimitExceeds_" + i;
+        files.add(file1);
+        writeContent(file1);
+        fs.satisfyStoragePolicy(new Path(file1));
+      }
+      String fileExceeds = "/testOutstandingQueueLimitExceeds_" + 4;
+      files.add(fileExceeds);
+      writeContent(fileExceeds);
+      try {
+        fs.satisfyStoragePolicy(new Path(fileExceeds));
+        Assert.fail("Should throw exception as it exceeds "
+            + "outstanding SPS call Q limit");
+      } catch (IOException ioe) {
+        GenericTestUtils.assertExceptionContains(
+            "Outstanding satisfier queue limit: 3 exceeded, try later!", ioe);
+      }
+    } finally {
+      shutdownCluster();
+    }
+  }
+
+  /**
+   * Test verifies status check when Satisfier is not running inside namenode.
+   */
+  @Test(timeout = 90000)
   public void testStoragePolicySatisfyPathStatus() throws Exception {
+    createCluster();
+    DistributedFileSystem fs = getFS();
+    try {
+      fs.getClient().checkStoragePolicySatisfyPathStatus(FILE);
+      Assert.fail("Should throw exception as SPS is not running inside NN!");
+    } catch (IOException e) {
+      GenericTestUtils.assertExceptionContains("Satisfier is not running"
+          + " inside namenode, so status can't be returned.", e);
+    }
+  }
+
+  /**
+   * This test need not run as external scan is not a batch based scanning 
right
+   * now.
+   */
+  @Ignore("ExternalFileIdCollector is not batch based right now."
+      + " So, ignoring it.")
+  public void testBatchProcessingForSPSDirectory() throws Exception {
   }
 
   /**


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org
For additional commands, e-mail: common-commits-h...@hadoop.apache.org

Reply via email to