Repository: hadoop
Updated Branches:
  refs/heads/branch-3.0 c2bbe22c5 -> ec1b445c6


HDFS-13136. Avoid taking FSN lock while doing group member lookup for FSD 
permission check. Contributed by Xiaoyu Yao.

(cherry picked from commit e978e1d9064fc794ea83217d0af2315d4167211e)


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/ec1b445c
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/ec1b445c
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/ec1b445c

Branch: refs/heads/branch-3.0
Commit: ec1b445c62b84e028a6c7db96848d106cf3c1165
Parents: c2bbe22
Author: Xiaoyu Yao <x...@apache.org>
Authored: Thu Feb 15 00:02:05 2018 -0800
Committer: Xiaoyu Yao <x...@apache.org>
Committed: Thu Feb 22 20:18:02 2018 -0800

----------------------------------------------------------------------
 .../server/namenode/EncryptionZoneManager.java  |   3 +-
 .../hadoop/hdfs/server/namenode/FSDirAclOp.java |  28 +--
 .../hdfs/server/namenode/FSDirAppendOp.java     |   2 +-
 .../hdfs/server/namenode/FSDirAttrOp.java       |  54 ++---
 .../hdfs/server/namenode/FSDirConcatOp.java     |   5 +-
 .../hdfs/server/namenode/FSDirDeleteOp.java     |   8 +-
 .../server/namenode/FSDirEncryptionZoneOp.java  |  12 +-
 .../hdfs/server/namenode/FSDirMkdirOp.java      |   3 +-
 .../hdfs/server/namenode/FSDirRenameOp.java     |  11 +-
 .../hdfs/server/namenode/FSDirSnapshotOp.java   |  33 +--
 .../server/namenode/FSDirStatAndListingOp.java  |  45 ++---
 .../hdfs/server/namenode/FSDirWriteFileOp.java  |   4 +-
 .../hdfs/server/namenode/FSDirXAttrOp.java      |  35 ++--
 .../hdfs/server/namenode/FSNamesystem.java      | 201 ++++++++++++-------
 .../hdfs/server/namenode/NameNodeAdapter.java   |   8 +-
 .../hdfs/server/namenode/TestAuditLogger.java   |   3 +-
 .../namenode/TestAuditLoggerWithCommands.java   |   7 +-
 17 files changed, 261 insertions(+), 201 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/ec1b445c/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/EncryptionZoneManager.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/EncryptionZoneManager.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/EncryptionZoneManager.java
index 3fcf797..176ae1d 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/EncryptionZoneManager.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/EncryptionZoneManager.java
@@ -154,9 +154,10 @@ public class EncryptionZoneManager {
   public void pauseForTestingAfterNthCheckpoint(final String zone,
       final int count) throws IOException {
     INodesInPath iip;
+    final FSPermissionChecker pc = dir.getPermissionChecker();
     dir.readLock();
     try {
-      iip = dir.resolvePath(dir.getPermissionChecker(), zone, DirOp.READ);
+      iip = dir.resolvePath(pc, zone, DirOp.READ);
     } finally {
       dir.readUnlock();
     }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/ec1b445c/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirAclOp.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirAclOp.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirAclOp.java
index cc51430..7b3471d 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirAclOp.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirAclOp.java
@@ -36,11 +36,10 @@ import java.util.List;
 
 class FSDirAclOp {
   static FileStatus modifyAclEntries(
-      FSDirectory fsd, final String srcArg, List<AclEntry> aclSpec)
-      throws IOException {
+      FSDirectory fsd, FSPermissionChecker pc, final String srcArg,
+      List<AclEntry> aclSpec) throws IOException {
     String src = srcArg;
     checkAclsConfigFlag(fsd);
-    FSPermissionChecker pc = fsd.getPermissionChecker();
     INodesInPath iip;
     fsd.writeLock();
     try {
@@ -61,11 +60,10 @@ class FSDirAclOp {
   }
 
   static FileStatus removeAclEntries(
-      FSDirectory fsd, final String srcArg, List<AclEntry> aclSpec)
-      throws IOException {
+      FSDirectory fsd, FSPermissionChecker pc, final String srcArg,
+      List<AclEntry> aclSpec) throws IOException {
     String src = srcArg;
     checkAclsConfigFlag(fsd);
-    FSPermissionChecker pc = fsd.getPermissionChecker();
     INodesInPath iip;
     fsd.writeLock();
     try {
@@ -85,11 +83,10 @@ class FSDirAclOp {
     return fsd.getAuditFileInfo(iip);
   }
 
-  static FileStatus removeDefaultAcl(FSDirectory fsd, final String srcArg)
-      throws IOException {
+  static FileStatus removeDefaultAcl(FSDirectory fsd, FSPermissionChecker pc,
+      final String srcArg) throws IOException {
     String src = srcArg;
     checkAclsConfigFlag(fsd);
-    FSPermissionChecker pc = fsd.getPermissionChecker();
     INodesInPath iip;
     fsd.writeLock();
     try {
@@ -109,11 +106,10 @@ class FSDirAclOp {
     return fsd.getAuditFileInfo(iip);
   }
 
-  static FileStatus removeAcl(FSDirectory fsd, final String srcArg)
-      throws IOException {
+  static FileStatus removeAcl(FSDirectory fsd, FSPermissionChecker pc,
+      final String srcArg) throws IOException {
     String src = srcArg;
     checkAclsConfigFlag(fsd);
-    FSPermissionChecker pc = fsd.getPermissionChecker();
     INodesInPath iip;
     fsd.writeLock();
     try {
@@ -129,11 +125,10 @@ class FSDirAclOp {
   }
 
   static FileStatus setAcl(
-      FSDirectory fsd, final String srcArg, List<AclEntry> aclSpec)
-      throws IOException {
+      FSDirectory fsd, FSPermissionChecker pc, final String srcArg,
+      List<AclEntry> aclSpec) throws IOException {
     String src = srcArg;
     checkAclsConfigFlag(fsd);
-    FSPermissionChecker pc = fsd.getPermissionChecker();
     INodesInPath iip;
     fsd.writeLock();
     try {
@@ -148,9 +143,8 @@ class FSDirAclOp {
   }
 
   static AclStatus getAclStatus(
-      FSDirectory fsd, String src) throws IOException {
+      FSDirectory fsd, FSPermissionChecker pc, String src) throws IOException {
     checkAclsConfigFlag(fsd);
-    FSPermissionChecker pc = fsd.getPermissionChecker();
     fsd.readLock();
     try {
       INodesInPath iip = fsd.resolvePath(pc, src, DirOp.READ);

http://git-wip-us.apache.org/repos/asf/hadoop/blob/ec1b445c/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirAppendOp.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirAppendOp.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirAppendOp.java
index 9926ee0..dad975b 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirAppendOp.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirAppendOp.java
@@ -148,7 +148,7 @@ final class FSDirAppendOp {
       fsd.writeUnlock();
     }
 
-    HdfsFileStatus stat = FSDirStatAndListingOp.getFileInfo(fsd, iip);
+    HdfsFileStatus stat = FSDirStatAndListingOp.getFileInfo(fsd, pc, iip);
     if (lb != null) {
       NameNode.stateChangeLog.debug(
           "DIR* NameSystem.appendFile: file {} for {} at {} block {} block"

http://git-wip-us.apache.org/repos/asf/hadoop/blob/ec1b445c/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirAttrOp.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirAttrOp.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirAttrOp.java
index 0dfaa8e..1c9e54b 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirAttrOp.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirAttrOp.java
@@ -51,12 +51,11 @@ import static 
org.apache.hadoop.hdfs.DFSConfigKeys.DFS_STORAGE_POLICY_ENABLED_KE
 
 public class FSDirAttrOp {
   static FileStatus setPermission(
-      FSDirectory fsd, final String src, FsPermission permission)
-      throws IOException {
+      FSDirectory fsd, FSPermissionChecker pc, final String src,
+      FsPermission permission) throws IOException {
     if (FSDirectory.isExactReservedName(src)) {
       throw new InvalidPathException(src);
     }
-    FSPermissionChecker pc = fsd.getPermissionChecker();
     INodesInPath iip;
     fsd.writeLock();
     try {
@@ -71,12 +70,11 @@ public class FSDirAttrOp {
   }
 
   static FileStatus setOwner(
-      FSDirectory fsd, String src, String username, String group)
-      throws IOException {
+      FSDirectory fsd, FSPermissionChecker pc, String src, String username,
+      String group) throws IOException {
     if (FSDirectory.isExactReservedName(src)) {
       throw new InvalidPathException(src);
     }
-    FSPermissionChecker pc = fsd.getPermissionChecker();
     INodesInPath iip;
     fsd.writeLock();
     try {
@@ -101,10 +99,8 @@ public class FSDirAttrOp {
   }
 
   static FileStatus setTimes(
-      FSDirectory fsd, String src, long mtime, long atime)
-      throws IOException {
-    FSPermissionChecker pc = fsd.getPermissionChecker();
-
+      FSDirectory fsd, FSPermissionChecker pc, String src, long mtime,
+      long atime) throws IOException {
     INodesInPath iip;
     fsd.writeLock();
     try {
@@ -129,11 +125,10 @@ public class FSDirAttrOp {
   }
 
   static boolean setReplication(
-      FSDirectory fsd, BlockManager bm, String src, final short replication)
-      throws IOException {
+      FSDirectory fsd, FSPermissionChecker pc, BlockManager bm, String src,
+      final short replication) throws IOException {
     bm.verifyReplication(src, replication, null);
     final boolean isFile;
-    FSPermissionChecker pc = fsd.getPermissionChecker();
     fsd.writeLock();
     try {
       final INodesInPath iip = fsd.resolvePath(pc, src, DirOp.WRITE);
@@ -153,33 +148,31 @@ public class FSDirAttrOp {
     return isFile;
   }
 
-  static FileStatus unsetStoragePolicy(FSDirectory fsd, BlockManager bm,
-      String src) throws IOException {
-    return setStoragePolicy(fsd, bm, src,
+  static FileStatus unsetStoragePolicy(FSDirectory fsd, FSPermissionChecker pc,
+      BlockManager bm, String src) throws IOException {
+    return setStoragePolicy(fsd, pc, bm, src,
         HdfsConstants.BLOCK_STORAGE_POLICY_ID_UNSPECIFIED, "unset");
   }
 
-  static FileStatus setStoragePolicy(FSDirectory fsd, BlockManager bm,
-      String src, final String policyName) throws IOException {
+  static FileStatus setStoragePolicy(FSDirectory fsd, FSPermissionChecker pc,
+      BlockManager bm, String src, final String policyName) throws IOException 
{
     // get the corresponding policy and make sure the policy name is valid
     BlockStoragePolicy policy = bm.getStoragePolicy(policyName);
     if (policy == null) {
       throw new HadoopIllegalArgumentException(
           "Cannot find a block policy with the name " + policyName);
     }
-
-    return setStoragePolicy(fsd, bm, src, policy.getId(), "set");
+    return setStoragePolicy(fsd, pc, bm, src, policy.getId(), "set");
   }
 
-  static FileStatus setStoragePolicy(FSDirectory fsd, BlockManager bm,
-      String src, final byte policyId, final String operation)
+  static FileStatus setStoragePolicy(FSDirectory fsd, FSPermissionChecker pc,
+      BlockManager bm, String src, final byte policyId, final String operation)
       throws IOException {
     if (!fsd.isStoragePolicyEnabled()) {
       throw new IOException(String.format(
           "Failed to %s storage policy since %s is set to false.", operation,
           DFS_STORAGE_POLICY_ENABLED_KEY));
     }
-    FSPermissionChecker pc = fsd.getPermissionChecker();
     INodesInPath iip;
     fsd.writeLock();
     try {
@@ -202,9 +195,8 @@ public class FSDirAttrOp {
     return bm.getStoragePolicies();
   }
 
-  static BlockStoragePolicy getStoragePolicy(FSDirectory fsd, BlockManager bm,
-      String path) throws IOException {
-    FSPermissionChecker pc = fsd.getPermissionChecker();
+  static BlockStoragePolicy getStoragePolicy(FSDirectory fsd,
+      FSPermissionChecker pc, BlockManager bm, String path) throws IOException 
{
     fsd.readLock();
     try {
       final INodesInPath iip = fsd.resolvePath(pc, path, DirOp.READ_LINK);
@@ -222,9 +214,8 @@ public class FSDirAttrOp {
     }
   }
 
-  static long getPreferredBlockSize(FSDirectory fsd, String src)
-      throws IOException {
-    FSPermissionChecker pc = fsd.getPermissionChecker();
+  static long getPreferredBlockSize(FSDirectory fsd, FSPermissionChecker pc,
+      String src) throws IOException {
     fsd.readLock();
     try {
       final INodesInPath iip = fsd.resolvePath(pc, src, DirOp.READ_LINK);
@@ -240,9 +231,8 @@ public class FSDirAttrOp {
    *
    * Note: This does not support ".inodes" relative path.
    */
-  static void setQuota(FSDirectory fsd, String src, long nsQuota, long ssQuota,
-      StorageType type) throws IOException {
-    FSPermissionChecker pc = fsd.getPermissionChecker();
+  static void setQuota(FSDirectory fsd, FSPermissionChecker pc, String src,
+      long nsQuota, long ssQuota, StorageType type) throws IOException {
     if (fsd.isPermissionEnabled()) {
       pc.checkSuperuserPrivilege();
     }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/ec1b445c/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirConcatOp.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirConcatOp.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirConcatOp.java
index 4cc5389..b423a95 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirConcatOp.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirConcatOp.java
@@ -48,14 +48,13 @@ import static org.apache.hadoop.util.Time.now;
  */
 class FSDirConcatOp {
 
-  static FileStatus concat(FSDirectory fsd, String target, String[] srcs,
-    boolean logRetryCache) throws IOException {
+  static FileStatus concat(FSDirectory fsd, FSPermissionChecker pc,
+      String target, String[] srcs, boolean logRetryCache) throws IOException {
     validatePath(target, srcs);
     assert srcs != null;
     if (FSDirectory.LOG.isDebugEnabled()) {
       FSDirectory.LOG.debug("concat {} to {}", Arrays.toString(srcs), target);
     }
-    FSPermissionChecker pc = fsd.getPermissionChecker();
     final INodesInPath targetIIP = fsd.resolvePath(pc, target, DirOp.WRITE);
     // write permission for the target
     if (fsd.isPermissionEnabled()) {

http://git-wip-us.apache.org/repos/asf/hadoop/blob/ec1b445c/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirDeleteOp.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirDeleteOp.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirDeleteOp.java
index a83a8b6..1fbb564 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirDeleteOp.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirDeleteOp.java
@@ -88,6 +88,7 @@ class FSDirDeleteOp {
    * For small directory or file the deletion is done in one shot.
    *
    * @param fsn namespace
+   * @param pc FS permission checker
    * @param src path name to be deleted
    * @param recursive boolean true to apply to all sub-directories recursively
    * @param logRetryCache whether to record RPC ids in editlog for retry cache
@@ -96,10 +97,9 @@ class FSDirDeleteOp {
    * @throws IOException
    */
   static BlocksMapUpdateInfo delete(
-      FSNamesystem fsn, String src, boolean recursive, boolean logRetryCache)
-      throws IOException {
+      FSNamesystem fsn, FSPermissionChecker pc, String src, boolean recursive,
+      boolean logRetryCache) throws IOException {
     FSDirectory fsd = fsn.getFSDirectory();
-    FSPermissionChecker pc = fsd.getPermissionChecker();
 
     if (FSDirectory.isExactReservedName(src)) {
       throw new InvalidPathException(src);
@@ -130,7 +130,7 @@ class FSDirDeleteOp {
    * <br>
    *
    * @param fsd the FSDirectory instance
-   * @param src a string representation of a path to an inode
+   * @param iip inodes of a path to be deleted
    * @param mtime the time the inode is removed
    */
   static void deleteForEditLog(FSDirectory fsd, INodesInPath iip, long mtime)

http://git-wip-us.apache.org/repos/asf/hadoop/blob/ec1b445c/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirEncryptionZoneOp.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirEncryptionZoneOp.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirEncryptionZoneOp.java
index 943e60d..9fbdaeb 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirEncryptionZoneOp.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirEncryptionZoneOp.java
@@ -693,11 +693,12 @@ final class FSDirEncryptionZoneOp {
    * a different ACL. HDFS should not try to operate on additional ACLs, but
    * rather use the generate ACL it already has.
    */
-  static String getCurrentKeyVersion(final FSDirectory dir, final String zone)
-      throws IOException {
+  static String getCurrentKeyVersion(final FSDirectory dir,
+      final FSPermissionChecker pc, final String zone) throws IOException {
     assert dir.getProvider() != null;
     assert !dir.hasReadLock();
-    final String keyName = FSDirEncryptionZoneOp.getKeyNameForZone(dir, zone);
+    final String keyName = FSDirEncryptionZoneOp.getKeyNameForZone(dir,
+        pc, zone);
     if (keyName == null) {
       throw new IOException(zone + " is not an encryption zone.");
     }
@@ -719,11 +720,10 @@ final class FSDirEncryptionZoneOp {
    * Resolve the zone to an inode, find the encryption zone info associated 
with
    * that inode, and return the key name. Does not contact the KMS.
    */
-  static String getKeyNameForZone(final FSDirectory dir, final String zone)
-      throws IOException {
+  static String getKeyNameForZone(final FSDirectory dir,
+      final FSPermissionChecker pc, final String zone) throws IOException {
     assert dir.getProvider() != null;
     final INodesInPath iip;
-    final FSPermissionChecker pc = dir.getPermissionChecker();
     dir.readLock();
     try {
       iip = dir.resolvePath(pc, zone, DirOp.READ);

http://git-wip-us.apache.org/repos/asf/hadoop/blob/ec1b445c/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirMkdirOp.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirMkdirOp.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirMkdirOp.java
index 89fd8a3..45bb6b4 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirMkdirOp.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirMkdirOp.java
@@ -39,13 +39,12 @@ import static org.apache.hadoop.util.Time.now;
 
 class FSDirMkdirOp {
 
-  static FileStatus mkdirs(FSNamesystem fsn, String src,
+  static FileStatus mkdirs(FSNamesystem fsn, FSPermissionChecker pc, String 
src,
       PermissionStatus permissions, boolean createParent) throws IOException {
     FSDirectory fsd = fsn.getFSDirectory();
     if(NameNode.stateChangeLog.isDebugEnabled()) {
       NameNode.stateChangeLog.debug("DIR* NameSystem.mkdirs: " + src);
     }
-    FSPermissionChecker pc = fsd.getPermissionChecker();
     fsd.writeLock();
     try {
       INodesInPath iip = fsd.resolvePath(pc, src, DirOp.CREATE);

http://git-wip-us.apache.org/repos/asf/hadoop/blob/ec1b445c/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirRenameOp.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirRenameOp.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirRenameOp.java
index bbbb724..efc8da2 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirRenameOp.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirRenameOp.java
@@ -47,14 +47,12 @@ import static 
org.apache.hadoop.hdfs.protocol.FSLimitException.PathComponentTooL
 class FSDirRenameOp {
   @Deprecated
   static RenameResult renameToInt(
-      FSDirectory fsd, final String src, final String dst,
-      boolean logRetryCache)
-      throws IOException {
+      FSDirectory fsd, FSPermissionChecker pc, final String src,
+      final String dst, boolean logRetryCache) throws IOException {
     if (NameNode.stateChangeLog.isDebugEnabled()) {
       NameNode.stateChangeLog.debug("DIR* NameSystem.renameTo: " + src +
           " to " + dst);
     }
-    FSPermissionChecker pc = fsd.getPermissionChecker();
 
     // Rename does not operate on link targets
     // Do not resolveLink when checking permissions of src and dst
@@ -230,8 +228,8 @@ class FSDirRenameOp {
    * The new rename which has the POSIX semantic.
    */
   static RenameResult renameToInt(
-      FSDirectory fsd, final String srcArg, final String dstArg,
-      boolean logRetryCache, Options.Rename... options)
+      FSDirectory fsd, FSPermissionChecker pc, final String srcArg,
+      final String dstArg, boolean logRetryCache, Options.Rename... options)
       throws IOException {
     String src = srcArg;
     String dst = dstArg;
@@ -239,7 +237,6 @@ class FSDirRenameOp {
       NameNode.stateChangeLog.debug("DIR* NameSystem.renameTo: with options -" 
+
           " " + src + " to " + dst);
     }
-    final FSPermissionChecker pc = fsd.getPermissionChecker();
 
     BlocksMapUpdateInfo collectedBlocks = new BlocksMapUpdateInfo();
     // returns resolved path

http://git-wip-us.apache.org/repos/asf/hadoop/blob/ec1b445c/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirSnapshotOp.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirSnapshotOp.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirSnapshotOp.java
index 9dd75bc..4c48db0 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirSnapshotOp.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirSnapshotOp.java
@@ -78,14 +78,17 @@ class FSDirSnapshotOp {
 
   /**
    * Create a snapshot
+   * @param fsd FS directory
+   * @param pc FS permission checker
    * @param snapshotRoot The directory path where the snapshot is taken
    * @param snapshotName The name of the snapshot
+   * @param logRetryCache whether to record RPC ids in editlog for retry cache
+   *                      rebuilding.
    */
   static String createSnapshot(
-      FSDirectory fsd, SnapshotManager snapshotManager, String snapshotRoot,
-      String snapshotName, boolean logRetryCache)
+      FSDirectory fsd, FSPermissionChecker pc, SnapshotManager snapshotManager,
+      String snapshotRoot, String snapshotName, boolean logRetryCache)
       throws IOException {
-    FSPermissionChecker pc = fsd.getPermissionChecker();
     final INodesInPath iip = fsd.resolvePath(pc, snapshotRoot, DirOp.WRITE);
     if (fsd.isPermissionEnabled()) {
       fsd.checkOwner(pc, iip);
@@ -113,10 +116,9 @@ class FSDirSnapshotOp {
     return snapshotPath;
   }
 
-  static void renameSnapshot(FSDirectory fsd, SnapshotManager snapshotManager,
-      String path, String snapshotOldName, String snapshotNewName,
-      boolean logRetryCache) throws IOException {
-    FSPermissionChecker pc = fsd.getPermissionChecker();
+  static void renameSnapshot(FSDirectory fsd, FSPermissionChecker pc,
+      SnapshotManager snapshotManager, String path, String snapshotOldName,
+      String snapshotNewName, boolean logRetryCache) throws IOException {
     final INodesInPath iip = fsd.resolvePath(pc, path, DirOp.WRITE);
     if (fsd.isPermissionEnabled()) {
       fsd.checkOwner(pc, iip);
@@ -134,8 +136,8 @@ class FSDirSnapshotOp {
   }
 
   static SnapshottableDirectoryStatus[] getSnapshottableDirListing(
-      FSDirectory fsd, SnapshotManager snapshotManager) throws IOException {
-    FSPermissionChecker pc = fsd.getPermissionChecker();
+      FSDirectory fsd, FSPermissionChecker pc, SnapshotManager snapshotManager)
+      throws IOException {
     fsd.readLock();
     try {
       final String user = pc.isSuperUser()? null : pc.getUser();
@@ -146,10 +148,9 @@ class FSDirSnapshotOp {
   }
 
   static SnapshotDiffReport getSnapshotDiffReport(FSDirectory fsd,
-      SnapshotManager snapshotManager, String path,
+      FSPermissionChecker pc, SnapshotManager snapshotManager, String path,
       String fromSnapshot, String toSnapshot) throws IOException {
     SnapshotDiffReport diffs;
-    final FSPermissionChecker pc = fsd.getPermissionChecker();
     fsd.readLock();
     try {
       INodesInPath iip = fsd.resolvePath(pc, path, DirOp.READ);
@@ -199,15 +200,19 @@ class FSDirSnapshotOp {
 
   /**
    * Delete a snapshot of a snapshottable directory
+   * @param fsd The FS directory
+   * @param pc The permission checker
+   * @param snapshotManager The snapshot manager
    * @param snapshotRoot The snapshottable directory
    * @param snapshotName The name of the to-be-deleted snapshot
+   * @param logRetryCache whether to record RPC ids in editlog for retry cache
+   *                      rebuilding.
    * @throws IOException
    */
   static INode.BlocksMapUpdateInfo deleteSnapshot(
-      FSDirectory fsd, SnapshotManager snapshotManager, String snapshotRoot,
-      String snapshotName, boolean logRetryCache)
+      FSDirectory fsd, FSPermissionChecker pc, SnapshotManager snapshotManager,
+      String snapshotRoot, String snapshotName, boolean logRetryCache)
       throws IOException {
-    FSPermissionChecker pc = fsd.getPermissionChecker();
     final INodesInPath iip = fsd.resolvePath(pc, snapshotRoot, DirOp.WRITE);
     if (fsd.isPermissionEnabled()) {
       fsd.checkOwner(pc, iip);

http://git-wip-us.apache.org/repos/asf/hadoop/blob/ec1b445c/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirStatAndListingOp.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirStatAndListingOp.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirStatAndListingOp.java
index 8cff3d4..73e5acb 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirStatAndListingOp.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirStatAndListingOp.java
@@ -51,9 +51,9 @@ import java.util.EnumSet;
 import static org.apache.hadoop.util.Time.now;
 
 class FSDirStatAndListingOp {
-  static DirectoryListing getListingInt(FSDirectory fsd, final String srcArg,
-      byte[] startAfter, boolean needLocation) throws IOException {
-    final FSPermissionChecker pc = fsd.getPermissionChecker();
+  static DirectoryListing getListingInt(FSDirectory fsd, FSPermissionChecker 
pc,
+      final String srcArg, byte[] startAfter, boolean needLocation)
+      throws IOException {
     final INodesInPath iip = fsd.resolvePath(pc, srcArg, DirOp.READ);
 
     // Get file name when startAfter is an INodePath.  This is not the
@@ -85,7 +85,8 @@ class FSDirStatAndListingOp {
 
   /**
    * Get the file info for a specific file.
-   *
+   * @param fsd The FS directory
+   * @param pc The permission checker
    * @param srcArg The string representation of the path to the file
    * @param resolveLink whether to throw UnresolvedLinkException
    *        if src refers to a symlink
@@ -93,11 +94,9 @@ class FSDirStatAndListingOp {
    * @return object containing information regarding the file
    *         or null if file not found
    */
-  static HdfsFileStatus getFileInfo(
-      FSDirectory fsd, String srcArg, boolean resolveLink)
-      throws IOException {
+  static HdfsFileStatus getFileInfo(FSDirectory fsd, FSPermissionChecker pc,
+      String srcArg, boolean resolveLink) throws IOException {
     DirOp dirOp = resolveLink ? DirOp.READ : DirOp.READ_LINK;
-    FSPermissionChecker pc = fsd.getPermissionChecker();
     final INodesInPath iip;
     if (pc.isSuperUser()) {
       // superuser can only get an ACE if an existing ancestor is a file.
@@ -111,25 +110,24 @@ class FSDirStatAndListingOp {
     } else {
       iip = fsd.resolvePath(pc, srcArg, dirOp);
     }
-    return getFileInfo(fsd, iip);
+    return getFileInfo(fsd, pc, iip);
   }
 
   /**
    * Returns true if the file is closed
    */
-  static boolean isFileClosed(FSDirectory fsd, String src) throws IOException {
-    FSPermissionChecker pc = fsd.getPermissionChecker();
+  static boolean isFileClosed(FSDirectory fsd, FSPermissionChecker pc,
+      String src) throws IOException {
     final INodesInPath iip = fsd.resolvePath(pc, src, DirOp.READ);
     return !INodeFile.valueOf(iip.getLastINode(), src).isUnderConstruction();
   }
 
   static ContentSummary getContentSummary(
-      FSDirectory fsd, String src) throws IOException {
-    FSPermissionChecker pc = fsd.getPermissionChecker();
+      FSDirectory fsd, FSPermissionChecker pc, String src) throws IOException {
     final INodesInPath iip = fsd.resolvePath(pc, src, DirOp.READ_LINK);
     // getContentSummaryInt() call will check access (if enabled) when
     // traversing all sub directories.
-    return getContentSummaryInt(fsd, iip);
+    return getContentSummaryInt(fsd, pc, iip);
   }
 
   /**
@@ -322,12 +320,13 @@ class FSDirStatAndListingOp {
 
   /** Get the file info for a specific file.
    * @param fsd FSDirectory
+   * @param pc The permission checker
    * @param iip The path to the file, the file is included
    * @param includeStoragePolicy whether to include storage policy
    * @return object containing information regarding the file
    *         or null if file not found
    */
-  static HdfsFileStatus getFileInfo(FSDirectory fsd,
+  static HdfsFileStatus getFileInfo(FSDirectory fsd, FSPermissionChecker pc,
       INodesInPath iip, boolean includeStoragePolicy) throws IOException {
     fsd.readLock();
     try {
@@ -344,8 +343,8 @@ class FSDirStatAndListingOp {
     }
   }
 
-  static HdfsFileStatus getFileInfo(FSDirectory fsd, INodesInPath iip)
-    throws IOException {
+  static HdfsFileStatus getFileInfo(FSDirectory fsd, FSPermissionChecker pc,
+      INodesInPath iip) throws IOException {
     fsd.readLock();
     try {
       HdfsFileStatus status = null;
@@ -356,7 +355,7 @@ class FSDirStatAndListingOp {
           status = FSDirectory.DOT_SNAPSHOT_DIR_STATUS;
         }
       } else {
-        status = getFileInfo(fsd, iip, true);
+        status = getFileInfo(fsd, pc, iip, true);
       }
       return status;
     } finally {
@@ -507,7 +506,7 @@ class FSDirStatAndListingOp {
   }
 
   private static ContentSummary getContentSummaryInt(FSDirectory fsd,
-      INodesInPath iip) throws IOException {
+      FSPermissionChecker pc, INodesInPath iip) throws IOException {
     fsd.readLock();
     try {
       INode targetNode = iip.getLastINode();
@@ -519,8 +518,7 @@ class FSDirStatAndListingOp {
         // processed. 0 means disabled. I.e. blocking for the entire duration.
         ContentSummaryComputationContext cscc =
             new ContentSummaryComputationContext(fsd, fsd.getFSNamesystem(),
-                fsd.getContentCountLimit(), fsd.getContentSleepMicroSec(),
-                fsd.getPermissionChecker());
+                fsd.getContentCountLimit(), fsd.getContentSleepMicroSec(), pc);
         ContentSummary cs = targetNode.computeAndConvertContentSummary(
             iip.getPathSnapshotId(), cscc);
         fsd.addYieldCount(cscc.getYieldCount());
@@ -532,8 +530,7 @@ class FSDirStatAndListingOp {
   }
 
   static QuotaUsage getQuotaUsage(
-      FSDirectory fsd, String src) throws IOException {
-    FSPermissionChecker pc = fsd.getPermissionChecker();
+      FSDirectory fsd, FSPermissionChecker pc, String src) throws IOException {
     final INodesInPath iip;
     fsd.readLock();
     try {
@@ -550,7 +547,7 @@ class FSDirStatAndListingOp {
       return usage;
     } else {
       //If quota isn't set, fall back to getContentSummary.
-      return getContentSummaryInt(fsd, iip);
+      return getContentSummaryInt(fsd, pc, iip);
     }
   }
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/ec1b445c/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirWriteFileOp.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirWriteFileOp.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirWriteFileOp.java
index c4041a3..1c31056 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirWriteFileOp.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirWriteFileOp.java
@@ -349,7 +349,7 @@ class FSDirWriteFileOp {
    * {@link ClientProtocol#create}
    */
   static HdfsFileStatus startFile(
-      FSNamesystem fsn, INodesInPath iip,
+      FSNamesystem fsn, FSPermissionChecker pc, INodesInPath iip,
       PermissionStatus permissions, String holder, String clientMachine,
       EnumSet<CreateFlag> flag, boolean createParent,
       short replication, long blockSize,
@@ -408,7 +408,7 @@ class FSDirWriteFileOp {
       NameNode.stateChangeLog.debug("DIR* NameSystem.startFile: added " +
           src + " inode " + newNode.getId() + " " + holder);
     }
-    return FSDirStatAndListingOp.getFileInfo(fsd, iip);
+    return FSDirStatAndListingOp.getFileInfo(fsd, pc, iip);
   }
 
   static INodeFile addFileForEditLog(

http://git-wip-us.apache.org/repos/asf/hadoop/blob/ec1b445c/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirXAttrOp.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirXAttrOp.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirXAttrOp.java
index be3092c..24a475f 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirXAttrOp.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirXAttrOp.java
@@ -51,22 +51,27 @@ class FSDirXAttrOp {
 
   /**
    * Set xattr for a file or directory.
-   *
+   * @param fsd
+   *          - FS directory
+   * @param pc
+   *          - FS permission checker
    * @param src
    *          - path on which it sets the xattr
    * @param xAttr
    *          - xAttr details to set
    * @param flag
    *          - xAttrs flags
+   * @param logRetryCache
+   *          - whether to record RPC ids in editlog for retry cache
+   *          rebuilding.
    * @throws IOException
    */
   static FileStatus setXAttr(
-      FSDirectory fsd, String src, XAttr xAttr, EnumSet<XAttrSetFlag> flag,
-      boolean logRetryCache)
+      FSDirectory fsd, FSPermissionChecker pc, String src, XAttr xAttr,
+      EnumSet<XAttrSetFlag> flag, boolean logRetryCache)
       throws IOException {
     checkXAttrsConfigFlag(fsd);
     checkXAttrSize(fsd, xAttr);
-    FSPermissionChecker pc = fsd.getPermissionChecker();
     XAttrPermissionFilter.checkPermissionForApi(
         pc, xAttr, FSDirectory.isReservedRawName(src));
     List<XAttr> xAttrs = Lists.newArrayListWithCapacity(1);
@@ -85,12 +90,10 @@ class FSDirXAttrOp {
     return fsd.getAuditFileInfo(iip);
   }
 
-  static List<XAttr> getXAttrs(FSDirectory fsd, final String srcArg,
-                               List<XAttr> xAttrs)
-      throws IOException {
+  static List<XAttr> getXAttrs(FSDirectory fsd, FSPermissionChecker pc,
+      final String srcArg, List<XAttr> xAttrs) throws IOException {
     String src = srcArg;
     checkXAttrsConfigFlag(fsd);
-    FSPermissionChecker pc = fsd.getPermissionChecker();
     final boolean isRawPath = FSDirectory.isReservedRawName(src);
     boolean getAll = xAttrs == null || xAttrs.isEmpty();
     if (!getAll) {
@@ -131,9 +134,8 @@ class FSDirXAttrOp {
   }
 
   static List<XAttr> listXAttrs(
-      FSDirectory fsd, String src) throws IOException {
+      FSDirectory fsd, FSPermissionChecker pc, String src) throws IOException {
     FSDirXAttrOp.checkXAttrsConfigFlag(fsd);
-    final FSPermissionChecker pc = fsd.getPermissionChecker();
     final boolean isRawPath = FSDirectory.isReservedRawName(src);
     final INodesInPath iip = fsd.resolvePath(pc, src, DirOp.READ);
     if (fsd.isPermissionEnabled()) {
@@ -146,18 +148,23 @@ class FSDirXAttrOp {
 
   /**
    * Remove an xattr for a file or directory.
-   *
+   * @param fsd
+   *          - FS direcotry
+   * @param pc
+   *          - FS permission checker
    * @param src
    *          - path to remove the xattr from
    * @param xAttr
    *          - xAttr to remove
+   * @param logRetryCache
+   *          - whether to record RPC ids in editlog for retry cache
+   *          rebuilding.
    * @throws IOException
    */
   static FileStatus removeXAttr(
-      FSDirectory fsd, String src, XAttr xAttr, boolean logRetryCache)
-      throws IOException {
+      FSDirectory fsd, FSPermissionChecker pc, String src, XAttr xAttr,
+      boolean logRetryCache) throws IOException {
     FSDirXAttrOp.checkXAttrsConfigFlag(fsd);
-    FSPermissionChecker pc = fsd.getPermissionChecker();
     XAttrPermissionFilter.checkPermissionForApi(
         pc, xAttr, FSDirectory.isReservedRawName(src));
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/ec1b445c/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
index 7210465..f41face 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
@@ -1850,11 +1850,12 @@ public class FSNamesystem implements Namesystem, 
FSNamesystemMBean,
     final String operationName = "setPermission";
     FileStatus auditStat;
     checkOperation(OperationCategory.WRITE);
+    final FSPermissionChecker pc = getPermissionChecker();
     writeLock();
     try {
       checkOperation(OperationCategory.WRITE);
       checkNameNodeSafeMode("Cannot set permission for " + src);
-      auditStat = FSDirAttrOp.setPermission(dir, src, permission);
+      auditStat = FSDirAttrOp.setPermission(dir, pc, src, permission);
     } catch (AccessControlException e) {
       logAuditEvent(false, operationName, src);
       throw e;
@@ -1874,11 +1875,12 @@ public class FSNamesystem implements Namesystem, 
FSNamesystemMBean,
     final String operationName = "setOwner";
     FileStatus auditStat;
     checkOperation(OperationCategory.WRITE);
+    final FSPermissionChecker pc = getPermissionChecker();
     writeLock();
     try {
       checkOperation(OperationCategory.WRITE);
       checkNameNodeSafeMode("Cannot set owner for " + src);
-      auditStat = FSDirAttrOp.setOwner(dir, src, username, group);
+      auditStat = FSDirAttrOp.setOwner(dir, pc, src, username, group);
     } catch (AccessControlException e) {
       logAuditEvent(false, operationName, src);
       throw e;
@@ -1898,7 +1900,7 @@ public class FSNamesystem implements Namesystem, 
FSNamesystemMBean,
     final String operationName = "open";
     checkOperation(OperationCategory.READ);
     GetBlockLocationsResult res = null;
-    FSPermissionChecker pc = getPermissionChecker();
+    final FSPermissionChecker pc = getPermissionChecker();
     readLock();
     try {
       checkOperation(OperationCategory.READ);
@@ -2011,11 +2013,12 @@ public class FSNamesystem implements Namesystem, 
FSNamesystemMBean,
     final String operationName = "concat";
     FileStatus stat = null;
     boolean success = false;
+    final FSPermissionChecker pc = getPermissionChecker();
     writeLock();
     try {
       checkOperation(OperationCategory.WRITE);
       checkNameNodeSafeMode("Cannot concat " + target);
-      stat = FSDirConcatOp.concat(dir, target, srcs, logRetryCache);
+      stat = FSDirConcatOp.concat(dir, pc, target, srcs, logRetryCache);
       success = true;
     } catch (AccessControlException ace) {
       logAuditEvent(success, operationName, Arrays.toString(srcs),
@@ -2039,11 +2042,12 @@ public class FSNamesystem implements Namesystem, 
FSNamesystemMBean,
     final String operationName = "setTimes";
     FileStatus auditStat;
     checkOperation(OperationCategory.WRITE);
+    final FSPermissionChecker pc = getPermissionChecker();
     writeLock();
     try {
       checkOperation(OperationCategory.WRITE);
       checkNameNodeSafeMode("Cannot set times " + src);
-      auditStat = FSDirAttrOp.setTimes(dir, src, mtime, atime);
+      auditStat = FSDirAttrOp.setTimes(dir, pc, src, mtime, atime);
     } catch (AccessControlException e) {
       logAuditEvent(false, operationName, src);
       throw e;
@@ -2077,8 +2081,8 @@ public class FSNamesystem implements Namesystem, 
FSNamesystemMBean,
         throw new HadoopIllegalArgumentException(
             "Cannot truncate to a negative file size: " + newLength + ".");
       }
-      final FSPermissionChecker pc = getPermissionChecker();
       checkOperation(OperationCategory.WRITE);
+      final FSPermissionChecker pc = getPermissionChecker();
       writeLock();
       BlocksMapUpdateInfo toRemoveBlocks = new BlocksMapUpdateInfo();
       try {
@@ -2147,11 +2151,13 @@ public class FSNamesystem implements Namesystem, 
FSNamesystemMBean,
     final String operationName = "setReplication";
     boolean success = false;
     checkOperation(OperationCategory.WRITE);
+    final FSPermissionChecker pc = getPermissionChecker();
     writeLock();
     try {
       checkOperation(OperationCategory.WRITE);
       checkNameNodeSafeMode("Cannot set replication for " + src);
-      success = FSDirAttrOp.setReplication(dir, blockManager, src, 
replication);
+      success = FSDirAttrOp.setReplication(dir, pc, blockManager, src,
+          replication);
     } catch (AccessControlException e) {
       logAuditEvent(false, operationName, src);
       throw e;
@@ -2175,11 +2181,12 @@ public class FSNamesystem implements Namesystem, 
FSNamesystemMBean,
     final String operationName = "setStoragePolicy";
     FileStatus auditStat;
     checkOperation(OperationCategory.WRITE);
+    final FSPermissionChecker pc = getPermissionChecker();
     writeLock();
     try {
       checkOperation(OperationCategory.WRITE);
       checkNameNodeSafeMode("Cannot set storage policy for " + src);
-      auditStat = FSDirAttrOp.setStoragePolicy(dir, blockManager, src,
+      auditStat = FSDirAttrOp.setStoragePolicy(dir, pc, blockManager, src,
                                                policyName);
     } catch (AccessControlException e) {
       logAuditEvent(false, operationName, src);
@@ -2200,11 +2207,12 @@ public class FSNamesystem implements Namesystem, 
FSNamesystemMBean,
     final String operationName = "unsetStoragePolicy";
     FileStatus auditStat;
     checkOperation(OperationCategory.WRITE);
+    final FSPermissionChecker pc = getPermissionChecker();
     writeLock();
     try {
       checkOperation(OperationCategory.WRITE);
       checkNameNodeSafeMode("Cannot unset storage policy for " + src);
-      auditStat = FSDirAttrOp.unsetStoragePolicy(dir, blockManager, src);
+      auditStat = FSDirAttrOp.unsetStoragePolicy(dir, pc, blockManager, src);
     } catch (AccessControlException e) {
       logAuditEvent(false, operationName, src);
       throw e;
@@ -2223,10 +2231,11 @@ public class FSNamesystem implements Namesystem, 
FSNamesystemMBean,
    */
   BlockStoragePolicy getStoragePolicy(String src) throws IOException {
     checkOperation(OperationCategory.READ);
+    final FSPermissionChecker pc = getPermissionChecker();
     readLock();
     try {
       checkOperation(OperationCategory.READ);
-      return FSDirAttrOp.getStoragePolicy(dir, blockManager, src);
+      return FSDirAttrOp.getStoragePolicy(dir, pc, blockManager, src);
     } finally {
       readUnlock("getStoragePolicy");
     }
@@ -2248,10 +2257,11 @@ public class FSNamesystem implements Namesystem, 
FSNamesystemMBean,
 
   long getPreferredBlockSize(String src) throws IOException {
     checkOperation(OperationCategory.READ);
+    final FSPermissionChecker pc = getPermissionChecker();
     readLock();
     try {
       checkOperation(OperationCategory.READ);
-      return FSDirAttrOp.getPreferredBlockSize(dir, src);
+      return FSDirAttrOp.getPreferredBlockSize(dir, pc, src);
     } finally {
       readUnlock("getPreferredBlockSize");
     }
@@ -2355,13 +2365,13 @@ public class FSNamesystem implements Namesystem, 
FSNamesystemMBean,
           "ecPolicyName are exclusive parameters. Set both is not allowed!");
     }
 
-    FSPermissionChecker pc = getPermissionChecker();
     INodesInPath iip = null;
     boolean skipSync = true; // until we do something that might create edits
     HdfsFileStatus stat = null;
     BlocksMapUpdateInfo toRemoveBlocks = null;
 
     checkOperation(OperationCategory.WRITE);
+    final FSPermissionChecker pc = getPermissionChecker();
     writeLock();
     try {
       checkOperation(OperationCategory.WRITE);
@@ -2402,7 +2412,7 @@ public class FSNamesystem implements Namesystem, 
FSNamesystemMBean,
       toRemoveBlocks = new BlocksMapUpdateInfo();
       dir.writeLock();
       try {
-        stat = FSDirWriteFileOp.startFile(this, iip, permissions, holder,
+        stat = FSDirWriteFileOp.startFile(this, pc, iip, permissions, holder,
             clientMachine, flag, createParent, replication, blockSize, feInfo,
             toRemoveBlocks, shouldReplicate, ecPolicyName, logRetryCache);
       } catch (IOException e) {
@@ -2442,8 +2452,8 @@ public class FSNamesystem implements Namesystem, 
FSNamesystemMBean,
   boolean recoverLease(String src, String holder, String clientMachine)
       throws IOException {
     boolean skipSync = false;
-    FSPermissionChecker pc = getPermissionChecker();
     checkOperation(OperationCategory.WRITE);
+    final FSPermissionChecker pc = getPermissionChecker();
     writeLock();
     try {
       checkOperation(OperationCategory.WRITE);
@@ -2582,8 +2592,8 @@ public class FSNamesystem implements Namesystem, 
FSNamesystemMBean,
     try {
       boolean skipSync = false;
       LastBlockWithStatus lbs = null;
-      final FSPermissionChecker pc = getPermissionChecker();
       checkOperation(OperationCategory.WRITE);
+      final FSPermissionChecker pc = getPermissionChecker();
       writeLock();
       try {
         checkOperation(OperationCategory.WRITE);
@@ -2638,8 +2648,8 @@ public class FSNamesystem implements Namesystem, 
FSNamesystemMBean,
 
     LocatedBlock[] onRetryBlock = new LocatedBlock[1];
     FSDirWriteFileOp.ValidateAddBlockResult r;
-    FSPermissionChecker pc = getPermissionChecker();
     checkOperation(OperationCategory.READ);
+    final FSPermissionChecker pc = getPermissionChecker();
     readLock();
     try {
       checkOperation(OperationCategory.READ);
@@ -2689,7 +2699,7 @@ public class FSNamesystem implements Namesystem, 
FSNamesystemMBean,
     final List<DatanodeStorageInfo> chosen;
     final BlockType blockType;
     checkOperation(OperationCategory.READ);
-    FSPermissionChecker pc = getPermissionChecker();
+    final FSPermissionChecker pc = getPermissionChecker();
     readLock();
     try {
       checkOperation(OperationCategory.READ);
@@ -2737,7 +2747,7 @@ public class FSNamesystem implements Namesystem, 
FSNamesystemMBean,
     NameNode.stateChangeLog.debug(
         "BLOCK* NameSystem.abandonBlock: {} of file {}", b, src);
     checkOperation(OperationCategory.WRITE);
-    FSPermissionChecker pc = getPermissionChecker();
+    final FSPermissionChecker pc = getPermissionChecker();
     writeLock();
     try {
       checkOperation(OperationCategory.WRITE);
@@ -2802,7 +2812,7 @@ public class FSNamesystem implements Namesystem, 
FSNamesystemMBean,
     throws IOException {
     boolean success = false;
     checkOperation(OperationCategory.WRITE);
-    FSPermissionChecker pc = getPermissionChecker();
+    final FSPermissionChecker pc = getPermissionChecker();
     writeLock();
     try {
       checkOperation(OperationCategory.WRITE);
@@ -2880,11 +2890,13 @@ public class FSNamesystem implements Namesystem, 
FSNamesystemMBean,
       throws IOException {
     final String operationName = "rename";
     FSDirRenameOp.RenameResult ret = null;
+    checkOperation(OperationCategory.WRITE);
+    final FSPermissionChecker pc = getPermissionChecker();
     writeLock();
     try {
       checkOperation(OperationCategory.WRITE);
       checkNameNodeSafeMode("Cannot rename " + src);
-      ret = FSDirRenameOp.renameToInt(dir, src, dst, logRetryCache);
+      ret = FSDirRenameOp.renameToInt(dir, pc, src, dst, logRetryCache);
     } catch (AccessControlException e)  {
       logAuditEvent(false, operationName, src, dst, null);
       throw e;
@@ -2904,11 +2916,14 @@ public class FSNamesystem implements Namesystem, 
FSNamesystemMBean,
       throws IOException {
     final String operationName = "rename";
     FSDirRenameOp.RenameResult res = null;
+    checkOperation(OperationCategory.WRITE);
+    final FSPermissionChecker pc = getPermissionChecker();
     writeLock();
     try {
       checkOperation(OperationCategory.WRITE);
       checkNameNodeSafeMode("Cannot rename " + src);
-      res = FSDirRenameOp.renameToInt(dir, src, dst, logRetryCache, options);
+      res = FSDirRenameOp.renameToInt(dir, pc, src, dst, logRetryCache,
+          options);
     } catch (AccessControlException e) {
       logAuditEvent(false, operationName + " (options=" +
           Arrays.toString(options) + ")", src, dst, null);
@@ -2939,13 +2954,15 @@ public class FSNamesystem implements Namesystem, 
FSNamesystemMBean,
       throws IOException {
     final String operationName = "delete";
     BlocksMapUpdateInfo toRemovedBlocks = null;
+    checkOperation(OperationCategory.WRITE);
+    final FSPermissionChecker pc = getPermissionChecker();
     writeLock();
     boolean ret = false;
     try {
       checkOperation(OperationCategory.WRITE);
       checkNameNodeSafeMode("Cannot delete " + src);
       toRemovedBlocks = FSDirDeleteOp.delete(
-          this, src, recursive, logRetryCache);
+          this, pc, src, recursive, logRetryCache);
       ret = toRemovedBlocks != null;
     } catch (AccessControlException e) {
       logAuditEvent(false, operationName, src);
@@ -3039,10 +3056,12 @@ public class FSNamesystem implements Namesystem, 
FSNamesystemMBean,
     final String operationName = "getfileinfo";
     checkOperation(OperationCategory.READ);
     HdfsFileStatus stat = null;
+    final FSPermissionChecker pc = getPermissionChecker();
     readLock();
     try {
       checkOperation(OperationCategory.READ);
-      stat = FSDirStatAndListingOp.getFileInfo(dir, src, resolveLink);
+      stat = FSDirStatAndListingOp.getFileInfo(
+          dir, pc, src, resolveLink);
     } catch (AccessControlException e) {
       logAuditEvent(false, operationName, src);
       throw e;
@@ -3059,10 +3078,11 @@ public class FSNamesystem implements Namesystem, 
FSNamesystemMBean,
   boolean isFileClosed(final String src) throws IOException {
     final String operationName = "isFileClosed";
     checkOperation(OperationCategory.READ);
+    final FSPermissionChecker pc = getPermissionChecker();
     readLock();
     try {
       checkOperation(OperationCategory.READ);
-      return FSDirStatAndListingOp.isFileClosed(dir, src);
+      return FSDirStatAndListingOp.isFileClosed(dir, pc, src);
     } catch (AccessControlException e) {
       logAuditEvent(false, operationName, src);
       throw e;
@@ -3079,11 +3099,13 @@ public class FSNamesystem implements Namesystem, 
FSNamesystemMBean,
     final String operationName = "mkdirs";
     FileStatus auditStat = null;
     checkOperation(OperationCategory.WRITE);
+    final FSPermissionChecker pc = getPermissionChecker();
     writeLock();
     try {
       checkOperation(OperationCategory.WRITE);
       checkNameNodeSafeMode("Cannot create directory " + src);
-      auditStat = FSDirMkdirOp.mkdirs(this, src, permissions, createParent);
+      auditStat = FSDirMkdirOp.mkdirs(this, pc, src, permissions,
+          createParent);
     } catch (AccessControlException e) {
       logAuditEvent(false, operationName, src);
       throw e;
@@ -3112,12 +3134,13 @@ public class FSNamesystem implements Namesystem, 
FSNamesystemMBean,
   ContentSummary getContentSummary(final String src) throws IOException {
     checkOperation(OperationCategory.READ);
     final String operationName = "contentSummary";
-    readLock();
     boolean success = true;
     ContentSummary cs;
+    final FSPermissionChecker pc = getPermissionChecker();
+    readLock();
     try {
       checkOperation(OperationCategory.READ);
-      cs = FSDirStatAndListingOp.getContentSummary(dir, src);
+      cs = FSDirStatAndListingOp.getContentSummary(dir, pc, src);
     } catch (AccessControlException ace) {
       success = false;
       logAuditEvent(success, operationName, src);
@@ -3147,11 +3170,12 @@ public class FSNamesystem implements Namesystem, 
FSNamesystemMBean,
     checkOperation(OperationCategory.READ);
     final String operationName = "quotaUsage";
     QuotaUsage quotaUsage;
+    final FSPermissionChecker pc = getPermissionChecker();
     readLock();
     boolean success = true;
     try {
       checkOperation(OperationCategory.READ);
-      quotaUsage = FSDirStatAndListingOp.getQuotaUsage(dir, src);
+      quotaUsage = FSDirStatAndListingOp.getQuotaUsage(dir, pc, src);
     } catch (AccessControlException ace) {
       success = false;
       logAuditEvent(success, operationName, src);
@@ -3177,12 +3201,13 @@ public class FSNamesystem implements Namesystem, 
FSNamesystemMBean,
     }
     checkOperation(OperationCategory.WRITE);
     final String operationName = "setQuota";
+    final FSPermissionChecker pc = getPermissionChecker();
     writeLock();
     boolean success = false;
     try {
       checkOperation(OperationCategory.WRITE);
       checkNameNodeSafeMode("Cannot set quota on " + src);
-      FSDirAttrOp.setQuota(dir, src, nsQuota, ssQuota, type);
+      FSDirAttrOp.setQuota(dir, pc, src, nsQuota, ssQuota, type);
       success = true;
     } catch (AccessControlException ace) {
       logAuditEvent(success, operationName, src);
@@ -3209,8 +3234,7 @@ public class FSNamesystem implements Namesystem, 
FSNamesystemMBean,
       throws IOException {
     NameNode.stateChangeLog.info("BLOCK* fsync: " + src + " for " + 
clientName);
     checkOperation(OperationCategory.WRITE);
-
-    FSPermissionChecker pc = getPermissionChecker();
+    final FSPermissionChecker pc = getPermissionChecker();
     writeLock();
     try {
       checkOperation(OperationCategory.WRITE);
@@ -3714,10 +3738,11 @@ public class FSNamesystem implements Namesystem, 
FSNamesystemMBean,
     checkOperation(OperationCategory.READ);
     final String operationName = "listStatus";
     DirectoryListing dl = null;
+    final FSPermissionChecker pc = getPermissionChecker();
     readLock();
     try {
       checkOperation(NameNode.OperationCategory.READ);
-      dl = getListingInt(dir, src, startAfter, needLocation);
+      dl = getListingInt(dir, pc, src, startAfter, needLocation);
     } catch (AccessControlException e) {
       logAuditEvent(false, operationName, src);
       throw e;
@@ -4621,6 +4646,13 @@ public class FSNamesystem implements Namesystem, 
FSNamesystemMBean,
     }
   }
 
+  void checkSuperuserPrivilege(FSPermissionChecker pc)
+      throws AccessControlException {
+    if (isPermissionEnabled) {
+      pc.checkSuperuserPrivilege();
+    }
+  }
+
   /**
    * Check to see if we have exceeded the limit on the number
    * of inodes.
@@ -6303,14 +6335,16 @@ public class FSNamesystem implements Namesystem, 
FSNamesystemMBean,
    */
   String createSnapshot(String snapshotRoot, String snapshotName,
                         boolean logRetryCache) throws IOException {
+    checkOperation(OperationCategory.WRITE);
     final String operationName = "createSnapshot";
     String snapshotPath = null;
     boolean success = false;
+    final FSPermissionChecker pc = getPermissionChecker();
     writeLock();
     try {
       checkOperation(OperationCategory.WRITE);
       checkNameNodeSafeMode("Cannot create snapshot for " + snapshotRoot);
-      snapshotPath = FSDirSnapshotOp.createSnapshot(dir,
+      snapshotPath = FSDirSnapshotOp.createSnapshot(dir, pc,
           snapshotManager, snapshotRoot, snapshotName, logRetryCache);
       success = true;
     } catch (AccessControlException ace) {
@@ -6337,15 +6371,17 @@ public class FSNamesystem implements Namesystem, 
FSNamesystemMBean,
   void renameSnapshot(
       String path, String snapshotOldName, String snapshotNewName,
       boolean logRetryCache) throws IOException {
+    checkOperation(OperationCategory.WRITE);
     final String operationName = "renameSnapshot";
     boolean success = false;
     String oldSnapshotRoot = Snapshot.getSnapshotPath(path, snapshotOldName);
     String newSnapshotRoot = Snapshot.getSnapshotPath(path, snapshotNewName);
+    final FSPermissionChecker pc = getPermissionChecker();
     writeLock();
     try {
       checkOperation(OperationCategory.WRITE);
       checkNameNodeSafeMode("Cannot rename snapshot for " + path);
-      FSDirSnapshotOp.renameSnapshot(dir, snapshotManager, path,
+      FSDirSnapshotOp.renameSnapshot(dir, pc, snapshotManager, path,
           snapshotOldName, snapshotNewName, logRetryCache);
       success = true;
     } catch (AccessControlException ace) {
@@ -6373,10 +6409,12 @@ public class FSNamesystem implements Namesystem, 
FSNamesystemMBean,
     SnapshottableDirectoryStatus[] status = null;
     checkOperation(OperationCategory.READ);
     boolean success = false;
+    final FSPermissionChecker pc = getPermissionChecker();
     readLock();
     try {
       checkOperation(OperationCategory.READ);
-      status = FSDirSnapshotOp.getSnapshottableDirListing(dir, 
snapshotManager);
+      status = FSDirSnapshotOp.getSnapshottableDirListing(dir, pc,
+          snapshotManager);
       success = true;
     } catch (AccessControlException ace) {
       logAuditEvent(success, operationName, null, null, null);
@@ -6413,10 +6451,11 @@ public class FSNamesystem implements Namesystem, 
FSNamesystemMBean,
         path : Snapshot.getSnapshotPath(path, fromSnapshot);
     String toSnapshotRoot = (toSnapshot == null || toSnapshot.isEmpty()) ?
         path : Snapshot.getSnapshotPath(path, toSnapshot);
+    final FSPermissionChecker pc = getPermissionChecker();
     readLock();
     try {
       checkOperation(OperationCategory.READ);
-      diffs = FSDirSnapshotOp.getSnapshotDiffReport(dir, snapshotManager,
+      diffs = FSDirSnapshotOp.getSnapshotDiffReport(dir, pc, snapshotManager,
           path, fromSnapshot, toSnapshot);
       success = true;
     } catch (AccessControlException ace) {
@@ -6430,7 +6469,7 @@ public class FSNamesystem implements Namesystem, 
FSNamesystemMBean,
         toSnapshotRoot, null);
     return diffs;
   }
-  
+
   /**
    * Delete a snapshot of a snapshottable directory
    * @param snapshotRoot The snapshottable directory
@@ -6443,14 +6482,15 @@ public class FSNamesystem implements Namesystem, 
FSNamesystemMBean,
     final String operationName = "deleteSnapshot";
     boolean success = false;
     String rootPath = null;
-    writeLock();
     BlocksMapUpdateInfo blocksToBeDeleted = null;
+    final FSPermissionChecker pc = getPermissionChecker();
+    writeLock();
     try {
       checkOperation(OperationCategory.WRITE);
       checkNameNodeSafeMode("Cannot delete snapshot for " + snapshotRoot);
       rootPath = Snapshot.getSnapshotPath(snapshotRoot, snapshotName);
-      blocksToBeDeleted = FSDirSnapshotOp.deleteSnapshot(dir, snapshotManager,
-          snapshotRoot, snapshotName, logRetryCache);
+      blocksToBeDeleted = FSDirSnapshotOp.deleteSnapshot(dir, pc,
+          snapshotManager, snapshotRoot, snapshotName, logRetryCache);
       success = true;
     } catch (AccessControlException ace) {
       logAuditEvent(success, operationName, rootPath, null, null);
@@ -6930,11 +6970,12 @@ public class FSNamesystem implements Namesystem, 
FSNamesystemMBean,
     final String operationName = "modifyAclEntries";
     FileStatus auditStat = null;
     checkOperation(OperationCategory.WRITE);
+    final FSPermissionChecker pc = getPermissionChecker();
     writeLock();
     try {
       checkOperation(OperationCategory.WRITE);
       checkNameNodeSafeMode("Cannot modify ACL entries on " + src);
-      auditStat = FSDirAclOp.modifyAclEntries(dir, src, aclSpec);
+      auditStat = FSDirAclOp.modifyAclEntries(dir, pc, src, aclSpec);
     } catch (AccessControlException e) {
       logAuditEvent(false, operationName, src);
       throw e;
@@ -6950,11 +6991,12 @@ public class FSNamesystem implements Namesystem, 
FSNamesystemMBean,
     final String operationName = "removeAclEntries";
     checkOperation(OperationCategory.WRITE);
     FileStatus auditStat = null;
+    final FSPermissionChecker pc = getPermissionChecker();
     writeLock();
     try {
       checkOperation(OperationCategory.WRITE);
       checkNameNodeSafeMode("Cannot remove ACL entries on " + src);
-      auditStat = FSDirAclOp.removeAclEntries(dir, src, aclSpec);
+      auditStat = FSDirAclOp.removeAclEntries(dir, pc, src, aclSpec);
     } catch (AccessControlException e) {
       logAuditEvent(false, operationName, src);
       throw e;
@@ -6969,11 +7011,12 @@ public class FSNamesystem implements Namesystem, 
FSNamesystemMBean,
     final String operationName = "removeDefaultAcl";
     FileStatus auditStat = null;
     checkOperation(OperationCategory.WRITE);
+    final FSPermissionChecker pc = getPermissionChecker();
     writeLock();
     try {
       checkOperation(OperationCategory.WRITE);
       checkNameNodeSafeMode("Cannot remove default ACL entries on " + src);
-      auditStat = FSDirAclOp.removeDefaultAcl(dir, src);
+      auditStat = FSDirAclOp.removeDefaultAcl(dir, pc, src);
     } catch (AccessControlException e) {
       logAuditEvent(false, operationName, src);
       throw e;
@@ -6988,11 +7031,12 @@ public class FSNamesystem implements Namesystem, 
FSNamesystemMBean,
     final String operationName = "removeAcl";
     FileStatus auditStat = null;
     checkOperation(OperationCategory.WRITE);
+    final FSPermissionChecker pc = getPermissionChecker();
     writeLock();
     try {
       checkOperation(OperationCategory.WRITE);
       checkNameNodeSafeMode("Cannot remove ACL on " + src);
-      auditStat = FSDirAclOp.removeAcl(dir, src);
+      auditStat = FSDirAclOp.removeAcl(dir, pc, src);
     } catch (AccessControlException e) {
       logAuditEvent(false, operationName, src);
       throw e;
@@ -7007,11 +7051,12 @@ public class FSNamesystem implements Namesystem, 
FSNamesystemMBean,
     final String operationName = "setAcl";
     FileStatus auditStat = null;
     checkOperation(OperationCategory.WRITE);
+    final FSPermissionChecker pc = getPermissionChecker();
     writeLock();
     try {
       checkOperation(OperationCategory.WRITE);
       checkNameNodeSafeMode("Cannot set ACL on " + src);
-      auditStat = FSDirAclOp.setAcl(dir, src, aclSpec);
+      auditStat = FSDirAclOp.setAcl(dir, pc, src, aclSpec);
     } catch (AccessControlException e) {
       logAuditEvent(false, operationName, src);
       throw e;
@@ -7026,10 +7071,11 @@ public class FSNamesystem implements Namesystem, 
FSNamesystemMBean,
     final String operationName = "getAclStatus";
     checkOperation(OperationCategory.READ);
     final AclStatus ret;
+    final FSPermissionChecker pc = getPermissionChecker();
     readLock();
     try {
       checkOperation(OperationCategory.READ);
-      ret = FSDirAclOp.getAclStatus(dir, src);
+      ret = FSDirAclOp.getAclStatus(dir, pc, src);
     } catch(AccessControlException ace) {
       logAuditEvent(false, operationName, src);
       throw ace;
@@ -7058,13 +7104,13 @@ public class FSNamesystem implements Namesystem, 
FSNamesystemMBean,
     try {
       Metadata metadata = FSDirEncryptionZoneOp.ensureKeyIsInitialized(dir,
           keyName, src);
-      checkSuperuserPrivilege();
-      FSPermissionChecker pc = getPermissionChecker();
+      final FSPermissionChecker pc = getPermissionChecker();
+      checkSuperuserPrivilege(pc);
       checkOperation(OperationCategory.WRITE);
       final FileStatus resultingStat;
       writeLock();
       try {
-        checkSuperuserPrivilege();
+        checkSuperuserPrivilege(pc);
         checkOperation(OperationCategory.WRITE);
         checkNameNodeSafeMode("Cannot create encryption zone on " + src);
         resultingStat = FSDirEncryptionZoneOp.createEncryptionZone(dir, src,
@@ -7119,12 +7165,13 @@ public class FSNamesystem implements Namesystem, 
FSNamesystemMBean,
       throws IOException {
     final String operationName = "listEncryptionZones";
     boolean success = false;
-    checkSuperuserPrivilege();
     checkOperation(OperationCategory.READ);
+    final FSPermissionChecker pc = getPermissionChecker();
+    checkSuperuserPrivilege(pc);
     readLock();
     try {
-      checkSuperuserPrivilege();
       checkOperation(OperationCategory.READ);
+      checkSuperuserPrivilege(pc);
       final BatchedListEntries<EncryptionZone> ret =
           FSDirEncryptionZoneOp.listEncryptionZones(dir, prevId);
       success = true;
@@ -7140,11 +7187,12 @@ public class FSNamesystem implements Namesystem, 
FSNamesystemMBean,
     boolean success = false;
     try {
       Preconditions.checkNotNull(zone, "zone is null.");
-      checkSuperuserPrivilege();
       checkOperation(OperationCategory.WRITE);
+      final FSPermissionChecker pc = dir.getPermissionChecker();
+      checkSuperuserPrivilege(pc);
       checkNameNodeSafeMode("NameNode in safemode, cannot " + action
           + " re-encryption on zone " + zone);
-      reencryptEncryptionZoneInt(zone, action, logRetryCache);
+      reencryptEncryptionZoneInt(pc, zone, action, logRetryCache);
       success = true;
     } finally {
       logAuditEvent(success, action + "reencryption", zone, null, null);
@@ -7155,12 +7203,13 @@ public class FSNamesystem implements Namesystem, 
FSNamesystemMBean,
       final long prevId) throws IOException {
     final String operationName = "listReencryptionStatus";
     boolean success = false;
-    checkSuperuserPrivilege();
     checkOperation(OperationCategory.READ);
+    final FSPermissionChecker pc = getPermissionChecker();
+    checkSuperuserPrivilege(pc);
     readLock();
     try {
-      checkSuperuserPrivilege();
       checkOperation(OperationCategory.READ);
+      checkSuperuserPrivilege(pc);
       final BatchedListEntries<ZoneReencryptionStatus> ret =
           FSDirEncryptionZoneOp.listReencryptionStatus(dir, prevId);
       success = true;
@@ -7171,9 +7220,9 @@ public class FSNamesystem implements Namesystem, 
FSNamesystemMBean,
     }
   }
 
-  private void reencryptEncryptionZoneInt(final String zone,
-      final ReencryptAction action, final boolean logRetryCache)
-      throws IOException {
+  private void reencryptEncryptionZoneInt(final FSPermissionChecker pc,
+      final String zone, final ReencryptAction action,
+      final boolean logRetryCache) throws IOException {
     if (getProvider() == null) {
       throw new IOException("No key provider configured, re-encryption "
           + "operation is rejected");
@@ -7181,7 +7230,8 @@ public class FSNamesystem implements Namesystem, 
FSNamesystemMBean,
     String keyVersionName = null;
     if (action == ReencryptAction.START) {
       // get zone's latest key version name out of the lock.
-      keyVersionName = FSDirEncryptionZoneOp.getCurrentKeyVersion(dir, zone);
+      keyVersionName =
+          FSDirEncryptionZoneOp.getCurrentKeyVersion(dir, pc, zone);
       if (keyVersionName == null) {
         throw new IOException("Failed to get key version name for " + zone);
       }
@@ -7190,11 +7240,10 @@ public class FSNamesystem implements Namesystem, 
FSNamesystemMBean,
     }
     writeLock();
     try {
-      checkSuperuserPrivilege();
+      checkSuperuserPrivilege(pc);
       checkOperation(OperationCategory.WRITE);
       checkNameNodeSafeMode("NameNode in safemode, cannot " + action
           + " re-encryption on zone " + zone);
-      final FSPermissionChecker pc = dir.getPermissionChecker();
       List<XAttr> xattrs;
       dir.writeLock();
       try {
@@ -7429,7 +7478,7 @@ public class FSNamesystem implements Namesystem, 
FSNamesystemMBean,
     final String operationName = "getErasureCodingPolicy";
     boolean success = false;
     checkOperation(OperationCategory.READ);
-    FSPermissionChecker pc = getPermissionChecker();
+    final FSPermissionChecker pc = getPermissionChecker();
     readLock();
     try {
       checkOperation(OperationCategory.READ);
@@ -7488,11 +7537,14 @@ public class FSNamesystem implements Namesystem, 
FSNamesystemMBean,
       throws IOException {
     final String operationName = "setXAttr";
     FileStatus auditStat = null;
+    checkOperation(OperationCategory.WRITE);
+    final FSPermissionChecker pc = getPermissionChecker();
     writeLock();
     try {
       checkOperation(OperationCategory.WRITE);
       checkNameNodeSafeMode("Cannot set XAttr on " + src);
-      auditStat = FSDirXAttrOp.setXAttr(dir, src, xAttr, flag, logRetryCache);
+      auditStat = FSDirXAttrOp.setXAttr(dir, pc, src, xAttr, flag,
+          logRetryCache);
     } catch (AccessControlException e) {
       logAuditEvent(false, operationName, src);
       throw e;
@@ -7508,10 +7560,11 @@ public class FSNamesystem implements Namesystem, 
FSNamesystemMBean,
     final String operationName = "getXAttrs";
     checkOperation(OperationCategory.READ);
     List<XAttr> fsXattrs;
+    final FSPermissionChecker pc = getPermissionChecker();
     readLock();
     try {
       checkOperation(OperationCategory.READ);
-      fsXattrs = FSDirXAttrOp.getXAttrs(dir, src, xAttrs);
+      fsXattrs = FSDirXAttrOp.getXAttrs(dir, pc, src, xAttrs);
     } catch (AccessControlException e) {
       logAuditEvent(false, operationName, src);
       throw e;
@@ -7526,10 +7579,11 @@ public class FSNamesystem implements Namesystem, 
FSNamesystemMBean,
     final String operationName = "listXAttrs";
     checkOperation(OperationCategory.READ);
     List<XAttr> fsXattrs;
+    final FSPermissionChecker pc = getPermissionChecker();
     readLock();
     try {
       checkOperation(OperationCategory.READ);
-      fsXattrs = FSDirXAttrOp.listXAttrs(dir, src);
+      fsXattrs = FSDirXAttrOp.listXAttrs(dir, pc, src);
     } catch (AccessControlException e) {
       logAuditEvent(false, operationName, src);
       throw e;
@@ -7544,11 +7598,13 @@ public class FSNamesystem implements Namesystem, 
FSNamesystemMBean,
       throws IOException {
     final String operationName = "removeXAttr";
     FileStatus auditStat = null;
+    checkOperation(OperationCategory.WRITE);
+    final FSPermissionChecker pc = getPermissionChecker();
     writeLock();
     try {
       checkOperation(OperationCategory.WRITE);
       checkNameNodeSafeMode("Cannot remove XAttr entry on " + src);
-      auditStat = FSDirXAttrOp.removeXAttr(dir, src, xAttr, logRetryCache);
+      auditStat = FSDirXAttrOp.removeXAttr(dir, pc, src, xAttr, logRetryCache);
     } catch (AccessControlException e) {
       logAuditEvent(false, operationName, src);
       throw e;
@@ -7562,7 +7618,7 @@ public class FSNamesystem implements Namesystem, 
FSNamesystemMBean,
   void checkAccess(String src, FsAction mode) throws IOException {
     final String operationName = "checkAccess";
     checkOperation(OperationCategory.READ);
-    FSPermissionChecker pc = getPermissionChecker();
+    final FSPermissionChecker pc = getPermissionChecker();
     readLock();
     try {
       checkOperation(OperationCategory.READ);
@@ -7813,5 +7869,16 @@ public class FSNamesystem implements Namesystem, 
FSNamesystemMBean,
         .size();
   }
 
+  // This method logs operatoinName without super user privilege.
+  // It should be called without holding FSN lock.
+  void checkSuperuserPrivilege(String operationName)
+      throws IOException {
+    try {
+      checkSuperuserPrivilege();
+    } catch (AccessControlException ace) {
+      logAuditEvent(false, operationName, null);
+      throw ace;
+    }
+  }
 }
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/ec1b445c/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/NameNodeAdapter.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/NameNodeAdapter.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/NameNodeAdapter.java
index 242e8f5..4eac7e1 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/NameNodeAdapter.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/NameNodeAdapter.java
@@ -72,12 +72,14 @@ public class NameNodeAdapter {
   }
   
   public static HdfsFileStatus getFileInfo(NameNode namenode, String src,
-      boolean resolveLink) throws AccessControlException, 
UnresolvedLinkException,
-        StandbyException, IOException {
+      boolean resolveLink) throws AccessControlException,
+      UnresolvedLinkException, StandbyException, IOException {
+    final FSPermissionChecker pc =
+        namenode.getNamesystem().getPermissionChecker();
     namenode.getNamesystem().readLock();
     try {
       return FSDirStatAndListingOp.getFileInfo(namenode.getNamesystem()
-          .getFSDirectory(), src, resolveLink);
+          .getFSDirectory(), pc, src, resolveLink);
     } finally {
       namenode.getNamesystem().readUnlock();
     }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/ec1b445c/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestAuditLogger.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestAuditLogger.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestAuditLogger.java
index 0e3cc8d..e028675 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestAuditLogger.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestAuditLogger.java
@@ -75,6 +75,7 @@ import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
+import static org.mockito.Matchers.any;
 import static org.mockito.Mockito.doThrow;
 
 /**
@@ -412,7 +413,7 @@ public class TestAuditLogger {
 
       final FSDirectory mockedDir = Mockito.spy(dir);
       AccessControlException ex = new AccessControlException();
-      doThrow(ex).when(mockedDir).getPermissionChecker();
+      doThrow(ex).when(mockedDir).checkTraverse(any(), any(), any());
       cluster.getNamesystem().setFSDirectory(mockedDir);
       assertTrue(DummyAuditLogger.initialized);
       DummyAuditLogger.resetLogCount();

http://git-wip-us.apache.org/repos/asf/hadoop/blob/ec1b445c/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestAuditLoggerWithCommands.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestAuditLoggerWithCommands.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestAuditLoggerWithCommands.java
index 2adf470..e45f93f 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestAuditLoggerWithCommands.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestAuditLoggerWithCommands.java
@@ -46,8 +46,9 @@ import org.junit.Test;
 import static 
org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_DELEGATION_TOKEN_ALWAYS_USE_KEY;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
-import org.mockito.Mock;
 import org.mockito.Mockito;
+
+import static org.mockito.Matchers.any;
 import static org.mockito.Mockito.doThrow;
 
 public class TestAuditLoggerWithCommands {
@@ -529,7 +530,7 @@ public class TestAuditLoggerWithCommands {
 
   @Test
   public void testListXattrs() throws Exception {
-    Path path = new Path("/test");
+    Path path = new Path("/testListXattrs");
     fileSys = DFSTestUtil.getFileSystemAs(user1, conf);
     fs.mkdirs(path);
     fs.setOwner(path,user1.getUserName(),user1.getPrimaryGroupName());
@@ -548,7 +549,7 @@ public class TestAuditLoggerWithCommands {
     final FSDirectory dir = cluster.getNamesystem().getFSDirectory();
     final FSDirectory mockedDir = Mockito.spy(dir);
     AccessControlException ex = new AccessControlException();
-    doThrow(ex).when(mockedDir).getPermissionChecker();
+    doThrow(ex).when(mockedDir).checkTraverse(any(), any(), any());
     cluster.getNamesystem().setFSDirectory(mockedDir);
     String aceGetAclStatus =
         ".*allowed=false.*ugi=theDoctor.*cmd=getAclStatus.*";


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org
For additional commands, e-mail: common-commits-h...@hadoop.apache.org

Reply via email to