HIVE-13989: Extended ACLs are not handled according to specification (Chris 
Drome reviewed by Vaibhav Gumashta)


Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/988c491d
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/988c491d
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/988c491d

Branch: refs/heads/branch-2
Commit: 988c491dd3d22ace3d34635b9a8d53abc4dbf1c7
Parents: b3a6e52
Author: Vaibhav Gumashta <vgumas...@hortonworks.com>
Authored: Sat Sep 9 12:15:53 2017 -0700
Committer: Vaibhav Gumashta <vgumas...@hortonworks.com>
Committed: Sat Sep 9 12:15:53 2017 -0700

----------------------------------------------------------------------
 .../apache/hadoop/hive/common/FileUtils.java    |   7 +-
 .../org/apache/hadoop/hive/io/HdfsUtils.java    | 135 ++--
 .../apache/hadoop/hive/io/TestHdfsUtils.java    |   9 +-
 .../mapreduce/FileOutputCommitterContainer.java | 147 +++-
 .../org/apache/hive/hcatalog/MiniCluster.java   |  16 +-
 itests/hcatalog-unit/pom.xml                    |   6 +
 .../hive/hcatalog/pig/TestExtendedAcls.java     | 748 +++++++++++++++++++
 .../hive/ql/security/TestExtendedAcls.java      | 228 +++++-
 .../hive/ql/security/FolderPermissionBase.java  | 199 +++--
 .../hive/ql/security/TestFolderPermissions.java |   5 +
 .../hadoop/hive/metastore/HiveAlterHandler.java |   2 +-
 .../apache/hadoop/hive/ql/metadata/Hive.java    |   6 +-
 12 files changed, 1349 insertions(+), 159 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hive/blob/988c491d/common/src/java/org/apache/hadoop/hive/common/FileUtils.java
----------------------------------------------------------------------
diff --git a/common/src/java/org/apache/hadoop/hive/common/FileUtils.java 
b/common/src/java/org/apache/hadoop/hive/common/FileUtils.java
index 8ed8cc4..2b7a57b 100644
--- a/common/src/java/org/apache/hadoop/hive/common/FileUtils.java
+++ b/common/src/java/org/apache/hadoop/hive/common/FileUtils.java
@@ -1023,6 +1023,11 @@ public final class FileUtils {
 
   public static void inheritPerms(Configuration conf, 
HdfsUtils.HadoopFileStatus sourceStatus, String targetGroup,
                                   FileSystem fs, Path target, boolean 
recursive) {
-    HdfsUtils.setFullFileStatus(conf, sourceStatus, targetGroup, fs, target, 
recursive);
+    inheritPerms(conf, sourceStatus, targetGroup, fs, target, recursive, true);
+  }
+
+  public static void inheritPerms(Configuration conf, 
HdfsUtils.HadoopFileStatus sourceStatus, String targetGroup,
+                                  FileSystem fs, Path target, boolean 
recursive, boolean isDir) {
+    HdfsUtils.setFullFileStatus(conf, sourceStatus, targetGroup, fs, target, 
recursive, isDir);
   }
 }

http://git-wip-us.apache.org/repos/asf/hive/blob/988c491d/common/src/java/org/apache/hadoop/hive/io/HdfsUtils.java
----------------------------------------------------------------------
diff --git a/common/src/java/org/apache/hadoop/hive/io/HdfsUtils.java 
b/common/src/java/org/apache/hadoop/hive/io/HdfsUtils.java
index 1b57184..16fc96e 100644
--- a/common/src/java/org/apache/hadoop/hive/io/HdfsUtils.java
+++ b/common/src/java/org/apache/hadoop/hive/io/HdfsUtils.java
@@ -77,7 +77,7 @@ public class HdfsUtils {
   public static void setFullFileStatus(Configuration conf, 
HdfsUtils.HadoopFileStatus sourceStatus,
       FileSystem fs, Path target, boolean recursion) {
     if (StorageUtils.shouldSetPerms(conf, fs)) {
-      setFullFileStatus(conf, sourceStatus, null, fs, target, recursion);
+      setFullFileStatus(conf, sourceStatus, null, fs, target, recursion, true);
     }
   }
 
@@ -95,14 +95,25 @@ public class HdfsUtils {
    */
   public static void setFullFileStatus(Configuration conf, 
HdfsUtils.HadoopFileStatus sourceStatus,
       String targetGroup, FileSystem fs, Path target, boolean recursion) {
+    setFullFileStatus(conf, sourceStatus, targetGroup, fs, target, recursion, 
true);
+  }
+
+  public static void setFullFileStatus(Configuration conf, 
HdfsUtils.HadoopFileStatus sourceStatus,
+      String targetGroup, FileSystem fs, Path target, boolean recursion, 
boolean isDir) {
     if (StorageUtils.shouldSetPerms(conf, fs)) {
-      setFullFileStatus(conf, sourceStatus, targetGroup, fs, target, 
recursion, recursion ? new FsShell() : null);
+      setFullFileStatus(conf, sourceStatus, targetGroup, fs, target, 
recursion, recursion ? new FsShell() : null, isDir);
     }
   }
 
   @VisibleForTesting
   static void setFullFileStatus(Configuration conf, HdfsUtils.HadoopFileStatus 
sourceStatus,
     String targetGroup, FileSystem fs, Path target, boolean recursion, FsShell 
fsShell) {
+    setFullFileStatus(conf, sourceStatus, targetGroup, fs, target, recursion, 
fsShell, true);
+  }
+
+  @VisibleForTesting
+  static void setFullFileStatus(Configuration conf, HdfsUtils.HadoopFileStatus 
sourceStatus,
+    String targetGroup, FileSystem fs, Path target, boolean recursion, FsShell 
fsShell, boolean isDir) {
     try {
       FileStatus fStatus = sourceStatus.getFileStatus();
       String group = fStatus.getGroup();
@@ -110,15 +121,47 @@ public class HdfsUtils {
       FsPermission sourcePerm = fStatus.getPermission();
       List<AclEntry> aclEntries = null;
       if (aclEnabled) {
-        if (sourceStatus.getAclEntries() != null) {
+        if (sourceStatus.getAclEntries() != null && ! 
sourceStatus.getAclEntries().isEmpty()) {
           LOG.trace(sourceStatus.getAclStatus().toString());
-          aclEntries = new ArrayList<>(sourceStatus.getAclEntries());
-          removeBaseAclEntries(aclEntries);
 
-          //the ACL api's also expect the tradition user/group/other 
permission in the form of ACL
-          aclEntries.add(newAclEntry(AclEntryScope.ACCESS, AclEntryType.USER, 
sourcePerm.getUserAction()));
-          aclEntries.add(newAclEntry(AclEntryScope.ACCESS, AclEntryType.GROUP, 
sourcePerm.getGroupAction()));
-          aclEntries.add(newAclEntry(AclEntryScope.ACCESS, AclEntryType.OTHER, 
sourcePerm.getOtherAction()));
+          List<AclEntry> defaults = 
extractDefaultAcls(sourceStatus.getAclEntries());
+          if (! defaults.isEmpty()) {
+            // Generate child ACLs based on parent DEFAULTs.
+            aclEntries = new ArrayList<AclEntry>(defaults.size() * 2);
+
+            // All ACCESS ACLs are derived from the DEFAULT ACLs of the parent.
+            // All DEFAULT ACLs of the parent are inherited by the child.
+            // If DEFAULT ACLs exist, it should include DEFAULTs for USER, 
OTHER, and MASK.
+            for (AclEntry acl : defaults) {
+              // OTHER permissions are not inherited by the child.
+              if (acl.getType() == AclEntryType.OTHER) {
+                aclEntries.add(newAclEntry(AclEntryScope.ACCESS, 
AclEntryType.OTHER, FsAction.NONE));
+              } else {
+                aclEntries.add(newAclEntry(AclEntryScope.ACCESS, 
acl.getType(), acl.getName(), acl.getPermission()));
+              }
+            }
+
+            // Add DEFAULTs for directories only; adding DEFAULTs for files 
throws an exception.
+            if (isDir) {
+              aclEntries.addAll(defaults);
+            }
+          } else {
+            // Parent has no DEFAULTs, hence child inherits no ACLs.
+            // Set basic permissions only.
+            FsAction groupAction = null;
+
+            for (AclEntry acl : sourceStatus.getAclEntries()) {
+              if (acl.getType() == AclEntryType.GROUP) {
+                groupAction = acl.getPermission();
+                break;
+              }
+            }
+
+            aclEntries = new ArrayList<AclEntry>(3);
+            aclEntries.add(newAclEntry(AclEntryScope.ACCESS, 
AclEntryType.USER, sourcePerm.getUserAction()));
+            aclEntries.add(newAclEntry(AclEntryScope.ACCESS, 
AclEntryType.GROUP, groupAction));
+            aclEntries.add(newAclEntry(AclEntryScope.ACCESS, 
AclEntryType.OTHER, FsAction.NONE));
+          }
         }
       }
 
@@ -129,19 +172,16 @@ public class HdfsUtils {
         if (group != null && !group.isEmpty()) {
           run(fsShell, new String[]{"-chgrp", "-R", group, target.toString()});
         }
-        if (aclEnabled) {
-          if (null != aclEntries) {
-            //Attempt extended Acl operations only if its enabled, 8791but 
don't fail the operation regardless.
-            try {
-              //construct the -setfacl command
-              String aclEntry = Joiner.on(",").join(aclEntries);
-              run(fsShell, new String[]{"-setfacl", "-R", "--set", aclEntry, 
target.toString()});
-
-            } catch (Exception e) {
-              LOG.info("Skipping ACL inheritance: File system for path " + 
target + " " +
-                  "does not support ACLs but dfs.namenode.acls.enabled is set 
to true. ");
-              LOG.debug("The details are: " + e, e);
-            }
+
+        if (aclEntries != null) {
+          try {
+            //construct the -setfacl command
+            String aclEntry = Joiner.on(",").join(aclEntries);
+            run(fsShell, new String[]{"-setfacl", "-R", "--set", aclEntry, 
target.toString()});
+          } catch (Exception e) {
+            LOG.info("Skipping ACL inheritance: File system for path " + 
target + " " +
+                "does not support ACLs but dfs.namenode.acls.enabled is set to 
true. ");
+            LOG.debug("The details are: " + e, e);
           }
         } else {
           String permission = Integer.toString(sourcePerm.toShort(), 8);
@@ -149,15 +189,13 @@ public class HdfsUtils {
         }
       } else {
         if (group != null && !group.isEmpty()) {
-          if (targetGroup == null ||
-              !group.equals(targetGroup)) {
+          if (targetGroup == null || !group.equals(targetGroup)) {
             fs.setOwner(target, null, group);
           }
         }
-        if (aclEnabled) {
-          if (null != aclEntries) {
-            fs.setAcl(target, aclEntries);
-          }
+
+        if (aclEntries != null) {
+          fs.setAcl(target, aclEntries);
         } else {
           fs.setPermission(target, sourcePerm);
         }
@@ -172,35 +210,48 @@ public class HdfsUtils {
 
   /**
    * Create a new AclEntry with scope, type and permission (no name).
-   *
-   * @param scope
-   *          AclEntryScope scope of the ACL entry
-   * @param type
-   *          AclEntryType ACL entry type
-   * @param permission
-   *          FsAction set of permissions in the ACL entry
+   * @param scope AclEntryScope scope of the ACL entry
+   * @param type AclEntryType ACL entry type
+   * @param permission FsAction set of permissions in the ACL entry
    * @return AclEntry new AclEntry
    */
   private static AclEntry newAclEntry(AclEntryScope scope, AclEntryType type,
       FsAction permission) {
-    return new AclEntry.Builder().setScope(scope).setType(type)
+    return newAclEntry(scope, type, null, permission);
+  }
+
+  /**
+   * Create a new AclEntry with scope, type and permission (no name).
+   * @param scope AclEntryScope scope of the ACL entry
+   * @param type AclEntryType ACL entry type
+   * @param name AclEntry name
+   * @param permission FsAction set of permissions in the ACL entry
+   * @return AclEntry new AclEntry
+   */
+  private static AclEntry newAclEntry(AclEntryScope scope, AclEntryType type,
+      String name, FsAction permission) {
+    return new AclEntry.Builder().setScope(scope).setType(type).setName(name)
         .setPermission(permission).build();
   }
 
   /**
-   * Removes basic permission acls (unamed acls) from the list of acl entries
-   * @param entries acl entries to remove from.
+   * Extracts the DEFAULT ACL entries from the list of acl entries
+   * @param acls acl entries to extract from
+   * @return default unnamed acl entries
    */
-  private static void removeBaseAclEntries(List<AclEntry> entries) {
-    Iterables.removeIf(entries, new Predicate<AclEntry>() {
+  private static List<AclEntry> extractDefaultAcls(List<AclEntry> acls) {
+    List<AclEntry> defaults = new ArrayList<AclEntry>(acls);
+    Iterables.removeIf(defaults, new Predicate<AclEntry>() {
       @Override
-      public boolean apply(AclEntry input) {
-        if (input.getName() == null) {
+      public boolean apply(AclEntry acl) {
+        if (! acl.getScope().equals(AclEntryScope.DEFAULT)) {
           return true;
         }
         return false;
       }
     });
+
+    return defaults;
   }
 
   private static void run(FsShell shell, String[] command) throws Exception {

http://git-wip-us.apache.org/repos/asf/hive/blob/988c491d/common/src/test/org/apache/hadoop/hive/io/TestHdfsUtils.java
----------------------------------------------------------------------
diff --git a/common/src/test/org/apache/hadoop/hive/io/TestHdfsUtils.java 
b/common/src/test/org/apache/hadoop/hive/io/TestHdfsUtils.java
index 79507e4..fff92b4 100644
--- a/common/src/test/org/apache/hadoop/hive/io/TestHdfsUtils.java
+++ b/common/src/test/org/apache/hadoop/hive/io/TestHdfsUtils.java
@@ -29,7 +29,10 @@ import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.FsShell;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.permission.AclEntry;
+import org.apache.hadoop.fs.permission.AclEntryScope;
+import org.apache.hadoop.fs.permission.AclEntryType;
 import org.apache.hadoop.fs.permission.AclStatus;
+import org.apache.hadoop.fs.permission.FsAction;
 import org.apache.hadoop.fs.permission.FsPermission;
 
 import org.apache.hadoop.hive.conf.HiveConf;
@@ -79,11 +82,15 @@ public class TestHdfsUtils {
     FileStatus mockSourceStatus = mock(FileStatus.class);
     AclStatus mockAclStatus = mock(AclStatus.class);
     FileSystem mockFs = mock(FileSystem.class);
+    ArrayList<AclEntry> acls = new ArrayList<AclEntry>();
+    acls.add(new 
AclEntry.Builder().setScope(AclEntryScope.ACCESS).setType(AclEntryType.USER).setPermission(FsAction.ALL).build());
+    acls.add(new 
AclEntry.Builder().setScope(AclEntryScope.ACCESS).setType(AclEntryType.GROUP).setPermission(FsAction.READ_EXECUTE).build());
+    acls.add(new 
AclEntry.Builder().setScope(AclEntryScope.ACCESS).setType(AclEntryType.OTHER).setPermission(FsAction.NONE).build());
 
     when(mockSourceStatus.getPermission()).thenReturn(new FsPermission((short) 
777));
     when(mockAclStatus.toString()).thenReturn("");
     when(mockHadoopFileStatus.getFileStatus()).thenReturn(mockSourceStatus);
-    when(mockHadoopFileStatus.getAclEntries()).thenReturn(new 
ArrayList<AclEntry>());
+    when(mockHadoopFileStatus.getAclEntries()).thenReturn(acls);
     when(mockHadoopFileStatus.getAclStatus()).thenReturn(mockAclStatus);
     doThrow(RuntimeException.class).when(mockFs).setAcl(any(Path.class), 
any(List.class));
 

http://git-wip-us.apache.org/repos/asf/hive/blob/988c491d/hcatalog/core/src/main/java/org/apache/hive/hcatalog/mapreduce/FileOutputCommitterContainer.java
----------------------------------------------------------------------
diff --git 
a/hcatalog/core/src/main/java/org/apache/hive/hcatalog/mapreduce/FileOutputCommitterContainer.java
 
b/hcatalog/core/src/main/java/org/apache/hive/hcatalog/mapreduce/FileOutputCommitterContainer.java
index 847c3bc..5357e12 100644
--- 
a/hcatalog/core/src/main/java/org/apache/hive/hcatalog/mapreduce/FileOutputCommitterContainer.java
+++ 
b/hcatalog/core/src/main/java/org/apache/hive/hcatalog/mapreduce/FileOutputCommitterContainer.java
@@ -33,6 +33,11 @@ import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.permission.AclEntry;
+import org.apache.hadoop.fs.permission.AclEntryScope;
+import org.apache.hadoop.fs.permission.AclEntryType;
+import org.apache.hadoop.fs.permission.AclStatus;
+import org.apache.hadoop.fs.permission.FsAction;
 import org.apache.hadoop.fs.permission.FsPermission;
 import org.apache.hadoop.hive.common.FileUtils;
 import org.apache.hadoop.hive.common.StatsSetupConst;
@@ -65,6 +70,9 @@ import org.apache.thrift.TException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import com.google.common.base.Predicate;
+import com.google.common.collect.Iterables;
+
 /**
  * Part of the FileOutput*Container classes
  * See {@link FileOutputFormatContainer} for more information
@@ -335,7 +343,7 @@ class FileOutputCommitterContainer extends 
OutputCommitterContainer {
     String partLocnRoot, String dynPartPath, Map<String, String> partKVs,
     HCatSchema outputSchema, Map<String, String> params,
     Table table, FileSystem fs,
-    String grpName, FsPermission perms) throws IOException {
+    String grpName, FsPermission perms, List<AclEntry> acls) throws 
IOException {
 
     Partition partition = new Partition();
     partition.setDbName(table.getDbName());
@@ -372,7 +380,7 @@ class FileOutputCommitterContainer extends 
OutputCommitterContainer {
       for (FieldSchema partKey : table.getPartitionKeys()) {
         if (i++ != 0) {
           fs.mkdirs(partPath); // Attempt to make the path in case it does not 
exist before we check
-          applyGroupAndPerms(fs, partPath, perms, grpName, false);
+          applyGroupAndPerms(fs, partPath, perms, acls, grpName, false);
         }
         partPath = constructPartialPartPath(partPath, 
partKey.getName().toLowerCase(), partKVs);
       }
@@ -382,7 +390,7 @@ class FileOutputCommitterContainer extends 
OutputCommitterContainer {
     // Need not bother in case of HDFS as permission is taken care of by 
setting UMask
     fs.mkdirs(partPath); // Attempt to make the path in case it does not exist 
before we check
     if (!ShimLoader.getHadoopShims().getHCatShim().isFileInHDFS(fs, partPath)) 
{
-      applyGroupAndPerms(fs, partPath, perms, grpName, true);
+      applyGroupAndPerms(fs, partPath, perms, acls, grpName, true);
     }
 
     // Set the location in the StorageDescriptor
@@ -401,21 +409,29 @@ class FileOutputCommitterContainer extends 
OutputCommitterContainer {
     return partition;
   }
 
-  private void applyGroupAndPerms(FileSystem fs, Path dir, FsPermission 
permission,
-                  String group, boolean recursive)
+  private void applyGroupAndPerms(FileSystem fs, Path path, FsPermission 
permission,
+                  List<AclEntry> acls, String group, boolean recursive)
     throws IOException {
     if(LOG.isDebugEnabled()) {
-      LOG.debug("applyGroupAndPerms : " + dir +
-          " perms: " + permission +
+      LOG.debug("applyGroupAndPerms : " + path +
+          " perms: " + permission + " acls: " + acls + 
           " group: " + group + " recursive: " + recursive);
     }
-    fs.setPermission(dir, permission);
+
+    if (acls != null && ! acls.isEmpty()) {
+      fs.setAcl(path, acls);
+    } else {
+      fs.setPermission(path, permission);
+    }
+
     if (recursive) {
-      for (FileStatus fileStatus : fs.listStatus(dir)) {
+      List<AclEntry> fileAcls = removeDefaultAcls(acls);
+
+      for (FileStatus fileStatus : fs.listStatus(path)) {
         if (fileStatus.isDir()) {
-          applyGroupAndPerms(fs, fileStatus.getPath(), permission, group, 
true);
+          applyGroupAndPerms(fs, fileStatus.getPath(), permission, acls, 
group, true);
         } else {
-          fs.setPermission(fileStatus.getPath(), permission);
+          applyGroupAndPerms(fs, fileStatus.getPath(), permission, fileAcls, 
group, false);
         }
       }
     }
@@ -775,6 +791,23 @@ class FileOutputCommitterContainer extends 
OutputCommitterContainer {
     try {
       HiveConf hiveConf = HCatUtil.getHiveConf(conf);
       client = HCatUtil.getHiveMetastoreClient(hiveConf);
+
+      FileStatus tblStat = fs.getFileStatus(tblPath);
+      String grpName = tblStat.getGroup();
+      FsPermission perms = tblStat.getPermission();
+      List<AclEntry> acls = null;
+
+      if (conf.getBoolean("dfs.namenode.acls.enabled", false)) {
+        try {
+          AclStatus stat = fs.getAclStatus(tblPath);
+          if (stat != null && ! stat.getEntries().isEmpty()) {
+            acls = generateChildAcls(stat.getEntries(), perms);
+          }
+        } catch (UnsupportedOperationException e) {
+          LOG.debug("Skipping ACLs", e);
+        }
+      }
+
       if (table.getPartitionKeys().size() == 0) {
         // Move data from temp directory the actual table directory
         // No metastore operation required.
@@ -788,27 +821,26 @@ class FileOutputCommitterContainer extends 
OutputCommitterContainer {
           table.getParameters().remove(StatsSetupConst.COLUMN_STATS_ACCURATE);
           client.alter_table(table.getDbName(), table.getTableName(), 
table.getTTable());
         }
+
+        applyGroupAndPerms(fs, tblPath, tblStat.getPermission(), acls, 
grpName, true);
+
         return;
       }
 
       StorerInfo storer = 
InternalUtil.extractStorerInfo(table.getTTable().getSd(),
           table.getParameters());
 
-      FileStatus tblStat = fs.getFileStatus(tblPath);
-      String grpName = tblStat.getGroup();
-      FsPermission perms = tblStat.getPermission();
-
       List<Partition> partitionsToAdd = new ArrayList<Partition>();
       if (!dynamicPartitioningUsed) {
         partitionsToAdd.add(constructPartition(context, jobInfo, 
tblPath.toString(), null,
             jobInfo.getPartitionValues(), jobInfo.getOutputSchema(), 
getStorerParameterMap(storer),
-            table, fs, grpName, perms));
+            table, fs, grpName, perms, acls));
       } else {
         for (Entry<String, Map<String, String>> entry : 
partitionsDiscoveredByPath.entrySet()) {
           partitionsToAdd.add(constructPartition(context, jobInfo,
               getPartitionRootLocation(entry.getKey(), 
entry.getValue().size()), entry.getKey(),
               entry.getValue(), jobInfo.getOutputSchema(), 
getStorerParameterMap(storer), table,
-              fs, grpName, perms));
+              fs, grpName, perms, acls));
         }
       }
 
@@ -950,7 +982,7 @@ class FileOutputCommitterContainer extends 
OutputCommitterContainer {
         // Set permissions appropriately for each of the partitions we just 
created
         // so as to have their permissions mimic the table permissions
         for (Partition p : partitionsAdded){
-          applyGroupAndPerms(fs,new 
Path(p.getSd().getLocation()),tblStat.getPermission(),tblStat.getGroup(),true);
+          applyGroupAndPerms(fs,new Path(p.getSd().getLocation()), 
tblStat.getPermission(), acls, tblStat.getGroup(), true);
         }
 
       }
@@ -1024,5 +1056,84 @@ class FileOutputCommitterContainer extends 
OutputCommitterContainer {
     }
   }
 
+  private AclEntry createAclEntry(AclEntryScope scope, AclEntryType type, 
String name, FsAction perm) {
+    return new AclEntry.Builder().setScope(scope).setType(type)
+      .setName(name).setPermission(perm).build();
+  }
+
+  private List<AclEntry> generateChildAcls(List<AclEntry> parentAcls, 
FsPermission parentPerms) {
+    List<AclEntry> acls = null;
+    List<AclEntry> defaults = extractDefaultAcls(parentAcls);
+    if (! defaults.isEmpty()) {
+      // Generate child ACLs based on parent DEFAULTs.
+      acls = new ArrayList<AclEntry>(defaults.size() * 2);
+
+      // All ACCESS ACLs are derived from the DEFAULT ACLs of the parent.
+      // All DEFAULT ACLs of the parent are inherited by the child.
+      // If DEFAULT ACLs exist, it should include DEFAULTs for USER, OTHER, 
and MASK.
+      for (AclEntry acl : defaults) {
+        // OTHER permissions are not inherited by the child.
+        if (acl.getType() == AclEntryType.OTHER) {
+          acls.add(createAclEntry(AclEntryScope.ACCESS, AclEntryType.OTHER, 
null, FsAction.NONE));
+        } else {
+          acls.add(createAclEntry(AclEntryScope.ACCESS, acl.getType(), 
acl.getName(), acl.getPermission()));
+        }
+      }
+
+      // Inherit all DEFAULT ACLs.
+      acls.addAll(defaults);
+    } else {
+      // Parent has no DEFAULTs, hence child inherits no ACLs.
+      // Set basic permissions only.
+      FsAction groupAction = null;
+
+      for (AclEntry acl : parentAcls) {
+        if (acl.getType() == AclEntryType.GROUP) {
+          groupAction = acl.getPermission();
+          break;
+        }
+      }
+
+      acls = new ArrayList<AclEntry>(3);
+      acls.add(createAclEntry(AclEntryScope.ACCESS, AclEntryType.USER, null, 
parentPerms.getUserAction()));
+      acls.add(createAclEntry(AclEntryScope.ACCESS, AclEntryType.GROUP, null, 
groupAction));
+      acls.add(createAclEntry(AclEntryScope.ACCESS, AclEntryType.OTHER, null, 
FsAction.NONE));
+    }
+
+    return acls;
+  }
+
+  private static List<AclEntry> extractDefaultAcls(List<AclEntry> acls) {
+    List<AclEntry> defaults = new ArrayList<AclEntry>(acls);
+    Iterables.removeIf(defaults, new Predicate<AclEntry>() {
+      @Override
+      public boolean apply(AclEntry acl) {
+        if (! acl.getScope().equals(AclEntryScope.DEFAULT)) {
+          return true;
+        }
+        return false;
+      }
+    });
+
+    return defaults;
+  }
+
+  private List<AclEntry> removeDefaultAcls(List<AclEntry> acls) {
+    List<AclEntry> nonDefaults = null;
 
+    if (acls != null) {
+      nonDefaults = new ArrayList<AclEntry>(acls);
+      Iterables.removeIf(nonDefaults, new Predicate<AclEntry>() {
+        @Override
+        public boolean apply(AclEntry acl) {
+          if (acl.getScope().equals(AclEntryScope.DEFAULT)) {
+            return true;
+          }
+          return false;
+        }
+      });
+    }
+
+    return nonDefaults;
+  }
 }

http://git-wip-us.apache.org/repos/asf/hive/blob/988c491d/hcatalog/core/src/test/java/org/apache/hive/hcatalog/MiniCluster.java
----------------------------------------------------------------------
diff --git 
a/hcatalog/core/src/test/java/org/apache/hive/hcatalog/MiniCluster.java 
b/hcatalog/core/src/test/java/org/apache/hive/hcatalog/MiniCluster.java
index d9d8251..0064bf6 100644
--- a/hcatalog/core/src/test/java/org/apache/hive/hcatalog/MiniCluster.java
+++ b/hcatalog/core/src/test/java/org/apache/hive/hcatalog/MiniCluster.java
@@ -49,17 +49,15 @@ public class MiniCluster {
   private JobConf m_conf = null;
 
   private final static MiniCluster INSTANCE = new MiniCluster();
-  private static boolean isSetup = true;
+  private static boolean isSetup = false;
 
   private MiniCluster() {
-    setupMiniDfsAndMrClusters();
   }
 
-  private void setupMiniDfsAndMrClusters() {
+  private void setupMiniDfsAndMrClusters(Configuration config) {
     try {
       final int dataNodes = 1;     // There will be 4 data nodes
       final int taskTrackers = 1;  // There will be 4 task tracker nodes
-      Configuration config = new Configuration();
 
       // Builds and starts the mini dfs and mapreduce clusters
       if(System.getProperty("hadoop.log.dir") == null) {
@@ -97,8 +95,12 @@ public class MiniCluster {
    * mapreduce cluster.
    */
   public static MiniCluster buildCluster() {
+    return buildCluster(new Configuration());
+  }
+
+  public static MiniCluster buildCluster(Configuration config) {
     if (!isSetup) {
-      INSTANCE.setupMiniDfsAndMrClusters();
+      INSTANCE.setupMiniDfsAndMrClusters(config);
       isSetup = true;
     }
     return INSTANCE;
@@ -133,6 +135,10 @@ public class MiniCluster {
     m_mr = null;
   }
 
+  public Configuration getConfiguration() {
+    return new Configuration(m_conf);
+  }
+
   public Properties getProperties() {
     errorIfNotSetup();
     Properties properties = new Properties();

http://git-wip-us.apache.org/repos/asf/hive/blob/988c491d/itests/hcatalog-unit/pom.xml
----------------------------------------------------------------------
diff --git a/itests/hcatalog-unit/pom.xml b/itests/hcatalog-unit/pom.xml
index aa94a76..c5eda76 100644
--- a/itests/hcatalog-unit/pom.xml
+++ b/itests/hcatalog-unit/pom.xml
@@ -101,6 +101,12 @@
       <version>${project.version}</version>
       <scope>test</scope>
     </dependency>
+    <dependency>
+      <groupId>org.apache.hive</groupId>
+      <artifactId>hive-it-util</artifactId>
+      <version>${project.version}</version>
+      <scope>test</scope>
+    </dependency>
     <!-- test inter-project -->
     <dependency>
       <groupId>commons-io</groupId>

http://git-wip-us.apache.org/repos/asf/hive/blob/988c491d/itests/hcatalog-unit/src/test/java/org/apache/hive/hcatalog/pig/TestExtendedAcls.java
----------------------------------------------------------------------
diff --git 
a/itests/hcatalog-unit/src/test/java/org/apache/hive/hcatalog/pig/TestExtendedAcls.java
 
b/itests/hcatalog-unit/src/test/java/org/apache/hive/hcatalog/pig/TestExtendedAcls.java
new file mode 100644
index 0000000..156665d
--- /dev/null
+++ 
b/itests/hcatalog-unit/src/test/java/org/apache/hive/hcatalog/pig/TestExtendedAcls.java
@@ -0,0 +1,748 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hive.hcatalog.pig;
+
+import static org.apache.hadoop.fs.permission.AclEntryScope.ACCESS;
+import static org.apache.hadoop.fs.permission.AclEntryScope.DEFAULT;
+import static org.apache.hadoop.fs.permission.AclEntryType.GROUP;
+import static org.apache.hadoop.fs.permission.AclEntryType.MASK;
+import static org.apache.hadoop.fs.permission.AclEntryType.OTHER;
+import static org.apache.hadoop.fs.permission.AclEntryType.USER;
+
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Sets;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.FileUtil;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.permission.AclEntry;
+import org.apache.hadoop.fs.permission.AclEntryScope;
+import org.apache.hadoop.fs.permission.AclEntryType;
+import org.apache.hadoop.fs.permission.FsAction;
+import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.hive.cli.CliSessionState;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.ql.CommandNeedRetryException;
+import org.apache.hadoop.hive.ql.Driver;
+import org.apache.hadoop.hive.ql.processors.CommandProcessorResponse;
+import org.apache.hadoop.hive.ql.session.SessionState;
+import org.apache.hive.hcatalog.HcatTestUtils;
+import org.apache.hive.hcatalog.MiniCluster;
+import org.apache.hive.hcatalog.common.HCatUtil;
+import org.apache.pig.ExecType;
+import org.apache.pig.PigServer;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.AfterClass;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Properties;
+import java.util.Random;
+import java.util.Set;
+
+public class TestExtendedAcls {
+  private static final Logger LOG = 
LoggerFactory.getLogger(TestExtendedAcls.class);
+  private static final String TEST_DATA_DIR = 
HCatUtil.makePathASafeFileName(System.getProperty("java.io.tmpdir")
+                                                + File.separator
+                                                + 
TestExtendedAcls.class.getCanonicalName()
+                                                + "-" + 
System.currentTimeMillis());
+  private static final String TEST_WAREHOUSE_DIR = TEST_DATA_DIR + 
"/warehouse";
+  private static final String TEXT_DATA_FILE = TEST_DATA_DIR + 
"/basic.input.data";
+  private static final String DEFAULT_DATABASE_NAME = "test_acls";
+  private static final String TABLE_NAME_PREFIX = "test_acls_";
+
+  private static MiniCluster cluster = null;
+  private static FileSystem clusterFS = null;
+  private static Driver driver;
+  private static Random random = new Random();
+  private static Set<String> dbsCreated = Sets.newHashSet();
+
+  private static ImmutableList<AclEntry> dirSpec = ImmutableList.of(
+      aclEntry(ACCESS, USER, FsAction.ALL),
+      aclEntry(ACCESS, GROUP, FsAction.READ_EXECUTE),
+      aclEntry(ACCESS, OTHER, FsAction.READ_EXECUTE),
+      aclEntry(ACCESS, MASK, FsAction.ALL),
+      aclEntry(ACCESS, USER, "foo", FsAction.READ_EXECUTE),
+      aclEntry(ACCESS, GROUP, "bar", FsAction.ALL),
+      aclEntry(DEFAULT, USER, FsAction.ALL),
+      aclEntry(DEFAULT, GROUP, FsAction.READ_EXECUTE),
+      aclEntry(DEFAULT, OTHER, FsAction.READ_EXECUTE),
+      aclEntry(DEFAULT, MASK, FsAction.ALL),
+      aclEntry(DEFAULT, USER, "foo", FsAction.READ_EXECUTE),
+      aclEntry(DEFAULT, GROUP, "bar", FsAction.ALL));
+
+  private static ImmutableList<AclEntry> restrictedDirSpec = ImmutableList.of(
+      aclEntry(ACCESS, USER, FsAction.ALL),
+      aclEntry(ACCESS, GROUP, FsAction.NONE),
+      aclEntry(ACCESS, OTHER, FsAction.READ_EXECUTE),
+      aclEntry(ACCESS, MASK, FsAction.ALL),
+      aclEntry(ACCESS, USER, "foo", FsAction.READ_EXECUTE),
+      aclEntry(DEFAULT, USER, FsAction.ALL),
+      aclEntry(DEFAULT, GROUP, FsAction.NONE),
+      aclEntry(DEFAULT, OTHER, FsAction.READ_EXECUTE),
+      aclEntry(DEFAULT, MASK, FsAction.ALL),
+      aclEntry(DEFAULT, USER, "foo", FsAction.READ_EXECUTE));
+
+  private static ImmutableList<AclEntry> dirSpec2 = ImmutableList.of(
+      aclEntry(ACCESS, USER, FsAction.ALL),
+      aclEntry(ACCESS, GROUP, FsAction.ALL),
+      aclEntry(ACCESS, OTHER, FsAction.READ_EXECUTE),
+      aclEntry(ACCESS, MASK, FsAction.ALL),
+      aclEntry(ACCESS, USER, "foo", FsAction.ALL),
+      aclEntry(ACCESS, USER, "foo2", FsAction.ALL), // No DEFAULT, so child 
should not inherit.
+      aclEntry(ACCESS, GROUP, "bar", FsAction.ALL),
+      aclEntry(ACCESS, GROUP, "bar2", FsAction.ALL), // No DEFAULT, so child 
should not inherit.
+      aclEntry(DEFAULT, USER, FsAction.ALL),
+      aclEntry(DEFAULT, GROUP, FsAction.READ_EXECUTE),
+      aclEntry(DEFAULT, OTHER, FsAction.READ_EXECUTE),
+      aclEntry(DEFAULT, MASK, FsAction.ALL),
+      aclEntry(DEFAULT, USER, "foo", FsAction.READ_EXECUTE),
+      aclEntry(DEFAULT, GROUP, "bar", FsAction.ALL));
+
+  private static ImmutableList<AclEntry> expectedDirSpec = ImmutableList.of(
+      aclEntry(ACCESS, USER, FsAction.ALL),
+      aclEntry(ACCESS, GROUP, FsAction.READ_EXECUTE),
+      aclEntry(ACCESS, OTHER, FsAction.NONE),
+      aclEntry(ACCESS, MASK, FsAction.ALL),
+      aclEntry(ACCESS, USER, "foo", FsAction.READ_EXECUTE),
+      aclEntry(ACCESS, GROUP, "bar", FsAction.ALL),
+      aclEntry(DEFAULT, USER, FsAction.ALL),
+      aclEntry(DEFAULT, GROUP, FsAction.READ_EXECUTE),
+      aclEntry(DEFAULT, OTHER, FsAction.READ_EXECUTE),
+      aclEntry(DEFAULT, MASK, FsAction.ALL),
+      aclEntry(DEFAULT, USER, "foo", FsAction.READ_EXECUTE),
+      aclEntry(DEFAULT, GROUP, "bar", FsAction.ALL));
+
+  private static ImmutableList<AclEntry> expectedRestrictedDirSpec = 
ImmutableList.of(
+      aclEntry(ACCESS, USER, FsAction.ALL),
+      aclEntry(ACCESS, GROUP, FsAction.NONE),
+      aclEntry(ACCESS, OTHER, FsAction.NONE),
+      aclEntry(ACCESS, MASK, FsAction.ALL),
+      aclEntry(ACCESS, USER, "foo", FsAction.READ_EXECUTE),
+      aclEntry(DEFAULT, USER, FsAction.ALL),
+      aclEntry(DEFAULT, GROUP, FsAction.NONE),
+      aclEntry(DEFAULT, OTHER, FsAction.READ_EXECUTE),
+      aclEntry(DEFAULT, MASK, FsAction.ALL),
+      aclEntry(DEFAULT, USER, "foo", FsAction.READ_EXECUTE));
+
+  private static ImmutableList<AclEntry> expectedDirSpec2 = ImmutableList.of(
+      aclEntry(ACCESS, USER, FsAction.ALL),
+      aclEntry(ACCESS, GROUP, FsAction.ALL),
+      aclEntry(ACCESS, OTHER, FsAction.NONE),
+      aclEntry(ACCESS, MASK, FsAction.ALL),
+      aclEntry(ACCESS, USER, "foo", FsAction.ALL),
+      aclEntry(ACCESS, GROUP, "bar", FsAction.ALL),
+      aclEntry(DEFAULT, USER, FsAction.ALL),
+      aclEntry(DEFAULT, GROUP, FsAction.READ_EXECUTE),
+      aclEntry(DEFAULT, OTHER, FsAction.READ_EXECUTE),
+      aclEntry(DEFAULT, MASK, FsAction.ALL),
+      aclEntry(DEFAULT, USER, "foo", FsAction.READ_EXECUTE),
+      aclEntry(DEFAULT, GROUP, "bar", FsAction.ALL));
+
+  public TestExtendedAcls() {
+  }
+
+  @BeforeClass
+  public static void setupAllTests() throws Exception {
+    setUpCluster();
+    setUpLocalFileSystemDirectories();
+    setUpClusterFileSystemDirectories();
+    setUpHiveDriver();
+    createTextData();
+  }
+
+  @Before
+  public void setupSingleTest() throws Exception {
+  }
+
+  @AfterClass
+  public static void tearDownAllTests() throws Exception {
+    for (String db : dbsCreated) {
+      dropDatabase(db);
+    }
+    tearDownCluster();
+    cleanUpLocalFileSystemDirectories();
+  }
+
+  @Test
+  public void testUnpartitionedTable() throws IOException, 
CommandNeedRetryException, FileNotFoundException {
+    String dbName = DEFAULT_DATABASE_NAME;
+    String tableName = TABLE_NAME_PREFIX + "1";
+
+    Path warehouseDir = new Path(TEST_WAREHOUSE_DIR);
+    Path dbDir = new Path(warehouseDir, dbName + ".db");
+    Path tblDir = new Path(dbDir, tableName);
+    FileSystem fs = cluster.getFileSystem();
+
+    if (! dbsCreated.contains(dbName)) {
+      createDatabase(dbName);
+      dbsCreated.add(dbName);
+    }
+
+    // No extended ACLs on the database directory.
+    fs.setPermission(dbDir, new FsPermission((short) 0750));
+
+    createUnpartitionedTable(dbName, tableName);
+
+    // Extended ACLs on the table directory.
+    fs.setAcl(tblDir, dirSpec);
+
+    FileStatus fstat = fs.getFileStatus(tblDir);
+    List<AclEntry> acls = fs.getAclStatus(tblDir).getEntries();
+
+    verify(tblDir, new FsPermission((short) 0775), fstat.getPermission(), 
dirSpec, acls, false);
+
+    Properties props = new Properties();
+    props.setProperty("dfs.namenode.acls.enabled", "true");
+
+    PigServer server = getPigServer(props);
+    server.setBatchOn();
+    int i = 0;
+    server.registerQuery("A = LOAD '" + TEXT_DATA_FILE + "' AS (a:int, 
b:chararray, c:chararray, dt:chararray, ds:chararray);", ++i);
+    server.registerQuery("B = FOREACH A GENERATE a, b, c;", ++i);
+    server.registerQuery("STORE B INTO '" + dbName + "." + tableName + "' 
USING org.apache.hive.hcatalog.pig.HCatStorer();", ++i);
+    server.executeBatch();
+
+    FileStatus[] fstats = fs.listStatus(tblDir);
+
+    // All files in the table directory should inherit table ACLs.
+    for (FileStatus fstat1 : fstats) {
+      acls = fs.getAclStatus(fstat1.getPath()).getEntries();
+      verify(fstat1.getPath(), new FsPermission((short) 0770), 
fstat1.getPermission(), expectedDirSpec, acls, true);
+    }
+
+    for (FileStatus fstat1 : fstats) {
+      fs.delete(fstat1.getPath(), true);
+    }
+
+    fs.setAcl(tblDir, dirSpec2);
+
+    fstat = fs.getFileStatus(tblDir);
+    acls = fs.getAclStatus(tblDir).getEntries();
+
+    verify(tblDir, new FsPermission((short) 0775), fstat.getPermission(), 
dirSpec2, acls, false);
+
+    server.registerQuery("A = LOAD '" + TEXT_DATA_FILE + "' AS (a:int, 
b:chararray, c:chararray, dt:chararray, ds:chararray);", ++i);
+    server.registerQuery("B = FOREACH A GENERATE a, b, c;", ++i);
+    server.registerQuery("STORE B INTO '" + dbName + "." + tableName + "' 
USING org.apache.hive.hcatalog.pig.HCatStorer();", ++i);
+    server.executeBatch();
+
+    fstats = fs.listStatus(tblDir);
+
+    // All files in the table directory should inherit table ACLs.
+    for (FileStatus fstat1 : fstats) {
+      acls = fs.getAclStatus(fstat1.getPath()).getEntries();
+      verify(fstat1.getPath(), new FsPermission((short) 0770), 
fstat1.getPermission(), expectedDirSpec, acls, true);
+    }
+  }
+
+  @Test
+  public void testPartitionedTableStatic() throws IOException, 
CommandNeedRetryException, FileNotFoundException {
+    String dbName = DEFAULT_DATABASE_NAME;
+    String tableName = TABLE_NAME_PREFIX + "2";
+
+    Path warehouseDir = new Path(TEST_WAREHOUSE_DIR);
+    Path dbDir = new Path(warehouseDir, dbName + ".db");
+    Path tblDir = new Path(dbDir, tableName);
+    FileSystem fs = cluster.getFileSystem();
+
+    if (! dbsCreated.contains(dbName)) {
+      createDatabase(dbName);
+      dbsCreated.add(dbName);
+    }
+
+    // No extended ACLs on the database directory.
+    fs.setPermission(dbDir, new FsPermission((short) 0750));
+
+    createPartitionedTable(dbName, tableName);
+
+    // Extended ACLs on the table directory.
+    fs.setAcl(tblDir, dirSpec);
+
+    FileStatus fstat = fs.getFileStatus(tblDir);
+    List<AclEntry> acls = fs.getAclStatus(tblDir).getEntries();
+
+    verify(tblDir, new FsPermission((short) 0775), fstat.getPermission(), 
dirSpec, acls, false);
+
+    Properties props = new Properties();
+    props.setProperty("dfs.namenode.acls.enabled", "true");
+
+    PigServer server = getPigServer(props);
+    server.setBatchOn();
+    int i = 0;
+    server.registerQuery("A = LOAD '" + TEXT_DATA_FILE + "' AS (a:int, 
b:chararray, c:chararray, dt:chararray, ds:chararray);", ++i);
+    server.registerQuery("B = FOREACH A GENERATE a, b, c;", ++i);
+    server.registerQuery("STORE B INTO '" + dbName + "." + tableName + "' 
USING org.apache.hive.hcatalog.pig.HCatStorer('dt=1,ds=1');", ++i);
+    server.executeBatch();
+
+    // Partition directories (dt=1/ds=1) and all files in the table directory 
should inherit table ACLs.
+    Path partDir = new Path(tblDir, "dt=1");
+    fstat = fs.getFileStatus(partDir);
+    acls = fs.getAclStatus(partDir).getEntries();
+
+    verify(partDir, new FsPermission((short) 0770), fstat.getPermission(), 
expectedDirSpec, acls, false);
+
+    partDir = new Path(partDir, "ds=1");
+    fstat = fs.getFileStatus(partDir);
+    acls = fs.getAclStatus(partDir).getEntries();
+
+    verify(partDir, new FsPermission((short) 0770), fstat.getPermission(), 
expectedDirSpec, acls, false);
+
+    FileStatus[] fstats = fs.listStatus(partDir);
+
+    Assert.assertTrue("Expected to find files in " + partDir, fstats.length > 
0);
+
+    for (FileStatus fstat1 : fstats) {
+      acls = fs.getAclStatus(fstat1.getPath()).getEntries();
+      verify(fstat1.getPath(), new FsPermission((short) 0770), 
fstat1.getPermission(), expectedDirSpec, acls, true);
+    }
+
+    // Parent directory (dt=1) has restrictive ACLs compared to table.
+    // Child directory (dt=1/ds=2) should inherit from table instead of parent.
+    // NOTE: this behavior is different from hive and should be corrected to 
inherit from the parent instead.
+    partDir = new Path(tblDir, "dt=1");
+    fs.setAcl(partDir, restrictedDirSpec);
+
+    fstat = fs.getFileStatus(partDir);
+    acls = fs.getAclStatus(partDir).getEntries();
+
+    verify(partDir, new FsPermission((short) 0775), fstat.getPermission(), 
restrictedDirSpec, acls, false);
+
+    server.registerQuery("A = LOAD '" + TEXT_DATA_FILE + "' AS (a:int, 
b:chararray, c:chararray, dt:chararray, ds:chararray);", ++i);
+    server.registerQuery("B = FOREACH A GENERATE a, b, c;", ++i);
+    server.registerQuery("STORE B INTO '" + dbName + "." + tableName + "' 
USING org.apache.hive.hcatalog.pig.HCatStorer('dt=1,ds=2');", ++i);
+    server.executeBatch();
+
+    partDir = new Path(partDir, "ds=2");
+    fstat = fs.getFileStatus(partDir);
+    acls = fs.getAclStatus(partDir).getEntries();
+
+    verify(partDir, new FsPermission((short) 0770), fstat.getPermission(), 
expectedDirSpec, acls, false);
+
+    fstats = fs.listStatus(partDir);
+
+    Assert.assertTrue("Expected to find files in " + partDir, fstats.length > 
0);
+
+    for (FileStatus fstat1 : fstats) {
+      acls = fs.getAclStatus(fstat1.getPath()).getEntries();
+      verify(fstat1.getPath(), new FsPermission((short) 0770), 
fstat1.getPermission(), expectedDirSpec, acls, true);
+    }
+
+    partDir = new Path(tblDir, "dt=1");
+    fs.setAcl(partDir, dirSpec2);
+
+    fstat = fs.getFileStatus(partDir);
+    acls = fs.getAclStatus(partDir).getEntries();
+
+    verify(partDir, new FsPermission((short) 0775), fstat.getPermission(), 
dirSpec2, acls, false);
+
+    server.registerQuery("A = LOAD '" + TEXT_DATA_FILE + "' AS (a:int, 
b:chararray, c:chararray, dt:chararray, ds:chararray);", ++i);
+    server.registerQuery("B = FOREACH A GENERATE a, b, c;", ++i);
+    server.registerQuery("STORE B INTO '" + dbName + "." + tableName + "' 
USING org.apache.hive.hcatalog.pig.HCatStorer('dt=1,ds=3');", ++i);
+    server.executeBatch();
+
+    partDir = new Path(partDir, "ds=3");
+    fstat = fs.getFileStatus(partDir);
+    acls = fs.getAclStatus(partDir).getEntries();
+
+    verify(partDir, new FsPermission((short) 0770), fstat.getPermission(), 
expectedDirSpec, acls, false);
+
+    fstats = fs.listStatus(partDir);
+
+    Assert.assertTrue("Expected to find files in " + partDir, fstats.length > 
0);
+
+    for (FileStatus fstat1 : fstats) {
+      acls = fs.getAclStatus(fstat1.getPath()).getEntries();
+      verify(fstat1.getPath(), new FsPermission((short) 0770), 
fstat1.getPermission(), expectedDirSpec, acls, true);
+    }
+  }
+
+  @Test
+  public void testPartitionedTableDynamic() throws IOException, 
CommandNeedRetryException, FileNotFoundException {
+    String dbName = DEFAULT_DATABASE_NAME;
+    String tableName = TABLE_NAME_PREFIX + "3";
+
+    Path warehouseDir = new Path(TEST_WAREHOUSE_DIR);
+    Path dbDir = new Path(warehouseDir, dbName + ".db");
+    Path tblDir = new Path(dbDir, tableName);
+    FileSystem fs = cluster.getFileSystem();
+
+    if (! dbsCreated.contains(dbName)) {
+      createDatabase(dbName);
+      dbsCreated.add(dbName);
+    }
+
+    // No extended ACLs on the database directory.
+    fs.setPermission(dbDir, new FsPermission((short) 0750));
+
+    createPartitionedTable(dbName, tableName);
+
+    // Extended ACLs on the table directory.
+    fs.setAcl(tblDir, dirSpec);
+
+    FileStatus fstat = fs.getFileStatus(tblDir);
+    List<AclEntry> acls = fs.getAclStatus(tblDir).getEntries();
+
+    verify(tblDir, new FsPermission((short) 0775), fstat.getPermission(), 
dirSpec, acls, false);
+
+    Properties props = new Properties();
+    props.setProperty("hive.exec.dynamic.partition", "true");
+    props.setProperty("hive.exec.dynamic.partition.mode", "nonstrict");
+    props.setProperty("dfs.namenode.acls.enabled", "true");
+
+    PigServer server = getPigServer(props);
+    server.setBatchOn();
+    int i = 0;
+    server.registerQuery("A = LOAD '" + TEXT_DATA_FILE + "' AS (a:int, 
b:chararray, c:chararray, dt:chararray, ds:chararray);", ++i);
+    server.registerQuery("B = FILTER A BY dt == '1' AND ds == '1';", ++i);
+    server.registerQuery("C = FOREACH B GENERATE a, b, c, dt, ds;", ++i);
+    server.registerQuery("STORE C INTO '" + dbName + "." + tableName + "' 
USING org.apache.hive.hcatalog.pig.HCatStorer();", ++i);
+    server.executeBatch();
+
+
+    // Currently, partition directories created on the dynamic partitioned 
code path are incorrect.
+    // Partition directories (dt=1/ds=1) and all files in the table directory 
should inherit table ACLs.
+    Path partDir = new Path(tblDir, "dt=1");
+    fstat = fs.getFileStatus(partDir);
+    acls = fs.getAclStatus(partDir).getEntries();
+
+    // TODO: Must check this separately because intermediate partition 
directories are not created properly.
+    Assert.assertEquals("Expected permission to be 0750", new 
FsPermission((short) 0755), fstat.getPermission());
+    verifyAcls(partDir, dirSpec, acls, false);
+    //verify(partDir, new FsPermission((short) 0770), fstat.getPermission(), 
expectedDirSpec, acls, false);
+
+    // This directory is moved from the temporary location, so should match 
dirSpec.
+    partDir = new Path(partDir, "ds=1");
+    fstat = fs.getFileStatus(partDir);
+    acls = fs.getAclStatus(partDir).getEntries();
+
+    verify(partDir, new FsPermission((short) 0770), fstat.getPermission(), 
expectedDirSpec, acls, false);
+
+    FileStatus[] fstats = fs.listStatus(partDir);
+
+    Assert.assertTrue("Expected to find files in " + partDir, fstats.length > 
0);
+
+    for (FileStatus fstat1 : fstats) {
+      acls = fs.getAclStatus(fstat1.getPath()).getEntries();
+      verify(fstat1.getPath(), new FsPermission((short) 0770), 
fstat1.getPermission(), expectedDirSpec, acls, true);
+    }
+
+    // Parent directory (dt=1) has restrictive ACLs compared to table.
+    // Child directory (dt=1/ds=2) should inherit from table instead of parent.
+    // NOTE: this behavior is different from hive and should be corrected to 
inherit from the parent instead.
+    partDir = new Path(tblDir, "dt=1");
+    fs.setAcl(partDir, restrictedDirSpec);
+
+    fstat = fs.getFileStatus(partDir);
+    acls = fs.getAclStatus(partDir).getEntries();
+
+    verify(partDir, new FsPermission((short) 0775), fstat.getPermission(), 
restrictedDirSpec, acls, false);
+
+    server.registerQuery("A = LOAD '" + TEXT_DATA_FILE + "' AS (a:int, 
b:chararray, c:chararray, dt:chararray, ds:chararray);", ++i);
+    server.registerQuery("B = FILTER A BY dt == '1' AND ds == '2';", ++i);
+    server.registerQuery("C = FOREACH B GENERATE a, b, c, dt, ds;", ++i);
+    server.registerQuery("STORE C INTO '" + dbName + "." + tableName + "' 
USING org.apache.hive.hcatalog.pig.HCatStorer();", ++i);
+    server.executeBatch();
+
+    partDir = new Path(partDir, "ds=2");
+    fstat = fs.getFileStatus(partDir);
+    acls = fs.getAclStatus(partDir).getEntries();
+
+    verify(partDir, new FsPermission((short) 0770), fstat.getPermission(), 
expectedDirSpec, acls, false);
+
+    fstats = fs.listStatus(partDir);
+
+    Assert.assertTrue("Expected to find files in " + partDir, fstats.length > 
0);
+
+    for (FileStatus fstat1 : fstats) {
+      acls = fs.getAclStatus(fstat1.getPath()).getEntries();
+      verify(fstat1.getPath(), new FsPermission((short) 0770), 
fstat1.getPermission(), expectedDirSpec, acls, true);
+    }
+
+    partDir = new Path(tblDir, "dt=1");
+    fs.setAcl(partDir, dirSpec2);
+
+    fstat = fs.getFileStatus(partDir);
+    acls = fs.getAclStatus(partDir).getEntries();
+
+    verify(partDir, new FsPermission((short) 0775), fstat.getPermission(), 
dirSpec2, acls, false);
+
+    server.registerQuery("A = LOAD '" + TEXT_DATA_FILE + "' AS (a:int, 
b:chararray, c:chararray, dt:chararray, ds:chararray);", ++i);
+    server.registerQuery("B = FILTER A BY dt == '1' AND ds == '3';", ++i);
+    server.registerQuery("C = FOREACH B GENERATE a, b, c, dt, ds;", ++i);
+    server.registerQuery("STORE C INTO '" + dbName + "." + tableName + "' 
USING org.apache.hive.hcatalog.pig.HCatStorer();", ++i);
+    server.executeBatch();
+
+    partDir = new Path(partDir, "ds=3");
+    fstat = fs.getFileStatus(partDir);
+    acls = fs.getAclStatus(partDir).getEntries();
+
+    verify(partDir, new FsPermission((short) 0770), fstat.getPermission(), 
expectedDirSpec, acls, false);
+
+    fstats = fs.listStatus(partDir);
+
+    Assert.assertTrue("Expected to find files in " + partDir, fstats.length > 
0);
+
+    for (FileStatus fstat1 : fstats) {
+      acls = fs.getAclStatus(fstat1.getPath()).getEntries();
+      verify(fstat1.getPath(), new FsPermission((short) 0770), 
fstat1.getPermission(), expectedDirSpec, acls, true);
+    }
+  }
+
+  private static void setUpCluster() throws Exception {
+    Configuration config = new Configuration();
+    config.set("dfs.namenode.acls.enabled", "true");
+
+    cluster = MiniCluster.buildCluster(config);
+    clusterFS = cluster.getFileSystem();
+  }
+
+  private static void tearDownCluster() throws Exception {
+    cluster.shutDown();
+  }
+
+  private static void setUpLocalFileSystemDirectories() {
+    File f = new File(TEST_DATA_DIR);
+    if (f.exists()) {
+      FileUtil.fullyDelete(f);
+    }
+    if(!(new File(TEST_DATA_DIR).mkdirs())) {
+      throw new RuntimeException("Could not create test-directory " + 
TEST_DATA_DIR + " on local filesystem.");
+    }
+  }
+
+  private static void cleanUpLocalFileSystemDirectories() {
+    File f = new File(TEST_DATA_DIR);
+    if (f.exists()) {
+      FileUtil.fullyDelete(f);
+    }
+  }
+
+  private static void setUpClusterFileSystemDirectories() throws IOException {
+    FileSystem clusterFS = cluster.getFileSystem();
+    Path warehouseDir = new Path(TEST_WAREHOUSE_DIR);
+    if (clusterFS.exists(warehouseDir)) {
+      clusterFS.delete(warehouseDir, true);
+    }
+    clusterFS.mkdirs(warehouseDir);
+  }
+
+  private static void setUpHiveDriver() throws IOException {
+    HiveConf hiveConf = createHiveConf();
+    driver = new Driver(hiveConf);
+    driver.setMaxRows(1000);
+    SessionState.start(new CliSessionState(hiveConf));
+  }
+
+  private static HiveConf createHiveConf() {
+    HiveConf hiveConf = new HiveConf(cluster.getConfiguration(), 
TestExtendedAcls.class);
+    hiveConf.set(HiveConf.ConfVars.PREEXECHOOKS.varname, "");
+    hiveConf.set(HiveConf.ConfVars.POSTEXECHOOKS.varname, "");
+    hiveConf.set(HiveConf.ConfVars.HIVE_SUPPORT_CONCURRENCY.varname, "false");
+    hiveConf.set(HiveConf.ConfVars.METASTOREWAREHOUSE.varname, 
TEST_WAREHOUSE_DIR);
+    return hiveConf;
+  }
+
+  /**
+   * Create data with schema:
+   *    number \t string \t filler_string
+   * @throws Exception
+   */
+  private static void createTextData() throws Exception {
+    int LOOP_SIZE = 100;
+    ArrayList<String> input = Lists.newArrayListWithExpectedSize((LOOP_SIZE+1) 
* LOOP_SIZE);
+    for (int i = 1; i <= LOOP_SIZE; i++) {
+      String si = i + "";
+      String sk = i % 2 == 0 ? "1" : "2";
+      for (int j = 1; j <= LOOP_SIZE; j++) {
+        String sj = "S" + j + "S";
+        String sm = Integer.toString((j % 3) + 1);
+        input.add(si + "\t" + (i*j) + "\t" + sj + "\t" + sk + "\t" + sm);
+      }
+    }
+
+    // Add nulls.
+    for (int i=0; i<LOOP_SIZE; ++i) {
+      String sk = i % 2 == 0 ? "1" : "2";
+      String sm = Integer.toString(((i % 6) >> 1) + 1);
+      input.add("\t" + "\t" + "S" + "_null_" + i + "_S" + "\t" + sk + "\t" + 
sm);
+    }
+
+    HcatTestUtils.createTestDataFile(TEXT_DATA_FILE, input.toArray(new 
String[input.size()]));
+    FileSystem fs = cluster.getFileSystem();
+    fs.copyFromLocalFile(new Path(TEXT_DATA_FILE), new Path(TEXT_DATA_FILE));
+  }
+
+  private void createDatabase(String dbName) throws IOException, 
CommandNeedRetryException {
+    String cmd = "CREATE DATABASE IF NOT EXISTS " + dbName;
+    executeStatementOnDriver(cmd);
+
+    cmd = "DESC DATABASE " + dbName;
+    executeStatementOnDriver(cmd);
+  }
+
+  private void createUnpartitionedTable(String dbName, String tableName) 
throws IOException, CommandNeedRetryException {
+    createTable(dbName, tableName, "a INT, b STRING, c STRING", null);
+  }
+
+  private void createPartitionedTable(String dbName, String tableName) throws 
IOException, CommandNeedRetryException {
+    createTable(dbName, tableName, "a INT, b STRING, c STRING", "dt STRING, ds 
STRING");
+  }
+
+  private void createTable(String dbName, String tableName, String schema, 
String partitionedBy)
+      throws IOException, CommandNeedRetryException {
+    String cmd = "CREATE TABLE " + dbName + "." + tableName + "(" + schema + 
") ";
+    if ((partitionedBy != null) && (!partitionedBy.trim().isEmpty())) {
+      cmd = cmd + "PARTITIONED BY (" + partitionedBy + ") ";
+    }
+    executeStatementOnDriver(cmd);
+
+    cmd = "DESC FORMATTED " + dbName + "." + tableName;
+    executeStatementOnDriver(cmd);
+  }
+
+  /**
+   * Execute Hive CLI statement
+   * @param cmd arbitrary statement to execute
+   */
+  private void executeStatementOnDriver(String cmd) throws IOException, 
CommandNeedRetryException {
+    LOG.debug("Executing: " + cmd);
+    CommandProcessorResponse cpr = driver.run(cmd);
+    if(cpr.getResponseCode() != 0) {
+      throw new IOException("Failed to execute \"" + cmd + "\". " +
+          "Driver returned " + cpr.getResponseCode() +
+          " Error: " + cpr.getErrorMessage());
+    }
+  }
+
+  private static void dropDatabase(String dbName) throws IOException, 
CommandNeedRetryException {
+    driver.run("DROP DATABASE IF EXISTS " + dbName + " CASCADE");
+  }
+
+  private PigServer getPigServer() throws IOException {
+    return getPigServer(null);
+  }
+
+  private PigServer getPigServer(Properties props) throws IOException {
+    if (props != null) {
+      return new PigServer(ExecType.LOCAL, props);
+    } else {
+      return new PigServer(ExecType.LOCAL);
+    }
+  }
+
+  /**
+   * Create a new AclEntry with scope, type and permission (no name).
+   *
+   * @param scope AclEntryScope scope of the ACL entry
+   * @param type AclEntryType ACL entry type
+   * @param permission FsAction set of permissions in the ACL entry
+   * @return new AclEntry
+   */
+  private static AclEntry aclEntry(AclEntryScope scope, AclEntryType type,
+      FsAction permission) {
+    return new AclEntry.Builder().setScope(scope).setType(type)
+        .setPermission(permission).build();
+  }
+
+  /**
+   * Create a new AclEntry with scope, type, name and permission.
+   *
+   * @param scope AclEntryScope scope of the ACL entry
+   * @param type AclEntryType ACL entry type
+   * @param name String optional ACL entry name
+   * @param permission FsAction set of permissions in the ACL entry
+   * @return new AclEntry
+   */
+  private static AclEntry aclEntry(AclEntryScope scope, AclEntryType type,
+      String name, FsAction permission) {
+    return new AclEntry.Builder().setScope(scope).setType(type).setName(name)
+        .setPermission(permission).build();
+  }
+
+  private void verify(Path path, FsPermission expectedPerm, FsPermission 
actualPerm,
+      List<AclEntry> expectedAcls, List<AclEntry> actualAcls, boolean isFile) {
+    FsAction maskPerm = null;
+
+    LOG.debug("Verify permissions on " + path + " expected=" + expectedPerm + 
" " + expectedAcls + " actual=" + actualPerm + " " + actualAcls);
+
+    for (AclEntry expected : expectedAcls) {
+      if (expected.getType() == MASK && expected.getScope() == DEFAULT) {
+        maskPerm = expected.getPermission();
+        break;
+      }
+    }
+
+    Assert.assertTrue("Permissions on " + path + " differ: expected=" + 
expectedPerm + " actual=" + actualPerm,
+        expectedPerm.getUserAction() == actualPerm.getUserAction());
+    Assert.assertTrue("Permissions on " + path + " differ: expected=" + 
expectedPerm + " actual=" + actualPerm,
+        expectedPerm.getOtherAction() == actualPerm.getOtherAction());
+    Assert.assertTrue("Mask permissions on " + path + " differ: expected=" + 
maskPerm + " actual=" + actualPerm,
+        maskPerm == actualPerm.getGroupAction());
+    verifyAcls(path, expectedAcls, actualAcls, isFile);
+  }
+
+  private void verifyAcls(Path path, List<AclEntry> expectedAcls, 
List<AclEntry> actualAcls, boolean isFile) {
+    ArrayList<AclEntry> acls = new ArrayList<AclEntry>(actualAcls);
+
+    for (AclEntry expected : expectedAcls) {
+      if (! isFile && expected.getScope() == DEFAULT) {
+        boolean found = false;
+        for (AclEntry acl : acls) {
+          if (acl.equals(expected)) {
+            acls.remove(acl);
+            found = true;
+            break;
+          }
+        }
+
+        Assert.assertTrue("ACLs on " + path + " differ: " + expected + " 
expected=" + expectedAcls + " actual=" + actualAcls, found);
+      } else if (expected.getName() != null || expected.getType() == GROUP) {
+        // Check file permissions for non-named ACLs, except for GROUP.
+        if (isFile && expected.getScope() == DEFAULT) {
+          // Files should not have DEFAULT ACLs.
+          continue;
+        }
+
+        boolean found = false;
+        for (AclEntry acl : acls) {
+          if (acl.equals(expected)) {
+            acls.remove(acl);
+            found = true;
+            break;
+          }
+        }
+
+        Assert.assertTrue("ACLs on " + path + " differ: " + expected + " 
expected=" + expectedAcls + " actual=" + actualAcls, found);
+      }
+    }
+
+    Assert.assertTrue("More ACLs on " + path + " than expected: actual=" + 
acls, acls.size() == 0);
+  }
+}

http://git-wip-us.apache.org/repos/asf/hive/blob/988c491d/itests/hive-unit-hadoop2/src/test/java/org/apache/hadoop/hive/ql/security/TestExtendedAcls.java
----------------------------------------------------------------------
diff --git 
a/itests/hive-unit-hadoop2/src/test/java/org/apache/hadoop/hive/ql/security/TestExtendedAcls.java
 
b/itests/hive-unit-hadoop2/src/test/java/org/apache/hadoop/hive/ql/security/TestExtendedAcls.java
index b798379..108eb99 100644
--- 
a/itests/hive-unit-hadoop2/src/test/java/org/apache/hadoop/hive/ql/security/TestExtendedAcls.java
+++ 
b/itests/hive-unit-hadoop2/src/test/java/org/apache/hadoop/hive/ql/security/TestExtendedAcls.java
@@ -18,13 +18,17 @@
 package org.apache.hadoop.hive.ql.security;
 
 import static org.apache.hadoop.fs.permission.AclEntryScope.ACCESS;
+import static org.apache.hadoop.fs.permission.AclEntryScope.DEFAULT;
 import static org.apache.hadoop.fs.permission.AclEntryType.GROUP;
+import static org.apache.hadoop.fs.permission.AclEntryType.MASK;
 import static org.apache.hadoop.fs.permission.AclEntryType.OTHER;
 import static org.apache.hadoop.fs.permission.AclEntryType.USER;
 
+import java.util.ArrayList;
 import java.util.List;
 
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.permission.AclEntry;
 import org.apache.hadoop.fs.permission.AclEntryScope;
 import org.apache.hadoop.fs.permission.AclEntryType;
@@ -34,11 +38,14 @@ import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
 import org.junit.Assert;
 import org.junit.BeforeClass;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.Lists;
 
 public class TestExtendedAcls extends FolderPermissionBase {
+  private static final Logger LOG = 
LoggerFactory.getLogger(TestExtendedAcls.class);
 
   @BeforeClass
   public static void setup() throws Exception {
@@ -49,32 +56,152 @@ public class TestExtendedAcls extends FolderPermissionBase 
{
     baseSetup();
   }
 
-  private final ImmutableList<AclEntry> aclSpec1 = ImmutableList.of(
+  // ACLs for setting up directories and files.
+  private final ImmutableList<AclEntry> aclsForDirs1 = ImmutableList.of(
       aclEntry(ACCESS, USER, FsAction.ALL),
       aclEntry(ACCESS, GROUP, FsAction.ALL),
       aclEntry(ACCESS, OTHER, FsAction.ALL),
+      aclEntry(ACCESS, MASK, FsAction.ALL),
+      aclEntry(ACCESS, USER, "bar", FsAction.READ_WRITE),
+      aclEntry(ACCESS, USER, "foo", FsAction.READ_EXECUTE),
+      aclEntry(ACCESS, GROUP, "bar", FsAction.READ_WRITE),
+      aclEntry(ACCESS, GROUP, "foo", FsAction.READ_EXECUTE),
+      aclEntry(DEFAULT, USER, "bar", FsAction.READ_WRITE),
+      aclEntry(DEFAULT, USER, "foo", FsAction.READ_EXECUTE),
+      aclEntry(DEFAULT, GROUP, "bar", FsAction.READ_WRITE),
+      aclEntry(DEFAULT, GROUP, "foo", FsAction.READ_EXECUTE),
+      aclEntry(DEFAULT, MASK, FsAction.ALL));
+
+  private final ImmutableList<AclEntry> aclsForDirs2 = ImmutableList.of(
+      aclEntry(ACCESS, USER, FsAction.ALL),
+      aclEntry(ACCESS, GROUP, FsAction.ALL),
+      aclEntry(ACCESS, OTHER, FsAction.READ_EXECUTE),
+      aclEntry(ACCESS, MASK, FsAction.ALL),
+      aclEntry(ACCESS, USER, "bar2", FsAction.READ_WRITE),
+      aclEntry(ACCESS, USER, "foo2", FsAction.READ_EXECUTE),
+      aclEntry(ACCESS, GROUP, "bar2", FsAction.READ),
+      aclEntry(ACCESS, GROUP, "foo2", FsAction.READ_EXECUTE),
+      aclEntry(DEFAULT, USER, "bar2", FsAction.READ_WRITE),
+      aclEntry(DEFAULT, USER, "foo2", FsAction.READ_EXECUTE),
+      aclEntry(DEFAULT, GROUP, "bar2", FsAction.READ_WRITE),
+      aclEntry(DEFAULT, GROUP, "foo2", FsAction.READ_EXECUTE),
+      aclEntry(DEFAULT, MASK, FsAction.ALL));
+
+  private final ImmutableList<AclEntry> aclsForFiles1 = ImmutableList.of(
+      aclEntry(ACCESS, USER, FsAction.ALL),
+      aclEntry(ACCESS, GROUP, FsAction.ALL),
+      aclEntry(ACCESS, OTHER, FsAction.ALL),
+      aclEntry(ACCESS, MASK, FsAction.ALL),
       aclEntry(ACCESS, USER, "bar", FsAction.READ_WRITE),
       aclEntry(ACCESS, USER, "foo", FsAction.READ_EXECUTE),
       aclEntry(ACCESS, GROUP, "bar", FsAction.READ_WRITE),
       aclEntry(ACCESS, GROUP, "foo", FsAction.READ_EXECUTE));
 
-  private final ImmutableList<AclEntry> aclSpec2 = ImmutableList.of(
+  private final ImmutableList<AclEntry> aclsForFiles2 = ImmutableList.of(
       aclEntry(ACCESS, USER, FsAction.ALL),
       aclEntry(ACCESS, GROUP, FsAction.ALL),
       aclEntry(ACCESS, OTHER, FsAction.READ_EXECUTE),
+      aclEntry(ACCESS, MASK, FsAction.ALL),
+      aclEntry(ACCESS, USER, "bar2", FsAction.READ_WRITE),
+      aclEntry(ACCESS, USER, "foo2", FsAction.READ_EXECUTE),
+      aclEntry(ACCESS, GROUP, "bar2", FsAction.READ_WRITE),
+      aclEntry(ACCESS, GROUP, "foo2", FsAction.READ_EXECUTE));
+
+  // ACLs for verifying directories and files.
+  private final ImmutableList<AclEntry> aclDirSpec1 = ImmutableList.of(
+      aclEntry(ACCESS, GROUP, FsAction.ALL),
+      aclEntry(ACCESS, USER, "bar", FsAction.READ_WRITE),
+      aclEntry(ACCESS, USER, "foo", FsAction.READ_EXECUTE),
+      aclEntry(ACCESS, GROUP, "bar", FsAction.READ_WRITE),
+      aclEntry(ACCESS, GROUP, "foo", FsAction.READ_EXECUTE),
+      aclEntry(DEFAULT, USER, FsAction.ALL),
+      aclEntry(DEFAULT, GROUP, FsAction.ALL),
+      aclEntry(DEFAULT, OTHER, FsAction.ALL),
+      aclEntry(DEFAULT, USER, "bar", FsAction.READ_WRITE),
+      aclEntry(DEFAULT, USER, "foo", FsAction.READ_EXECUTE),
+      aclEntry(DEFAULT, GROUP, "bar", FsAction.READ_WRITE),
+      aclEntry(DEFAULT, GROUP, "foo", FsAction.READ_EXECUTE),
+      aclEntry(DEFAULT, MASK, FsAction.ALL));
+
+  private final ImmutableList<AclEntry> aclDirSpec2 = ImmutableList.of(
+      aclEntry(ACCESS, GROUP, FsAction.ALL),
       aclEntry(ACCESS, USER, "bar2", FsAction.READ_WRITE),
       aclEntry(ACCESS, USER, "foo2", FsAction.READ_EXECUTE),
       aclEntry(ACCESS, GROUP, "bar2", FsAction.READ),
+      aclEntry(ACCESS, GROUP, "foo2", FsAction.READ_EXECUTE),
+      aclEntry(DEFAULT, USER, FsAction.ALL),
+      aclEntry(DEFAULT, GROUP, FsAction.ALL),
+      aclEntry(DEFAULT, OTHER, FsAction.READ_EXECUTE),
+      aclEntry(DEFAULT, USER, "bar2", FsAction.READ_WRITE),
+      aclEntry(DEFAULT, USER, "foo2", FsAction.READ_EXECUTE),
+      aclEntry(DEFAULT, GROUP, "bar2", FsAction.READ_WRITE),
+      aclEntry(DEFAULT, GROUP, "foo2", FsAction.READ_EXECUTE),
+      aclEntry(DEFAULT, MASK, FsAction.ALL));
+
+  private final FsPermission perm1 = new FsPermission((short) 0777);
+  private final FsPermission perm2 = new FsPermission((short) 0775);
+
+  private final ImmutableList<AclEntry> aclFileSpec1 = ImmutableList.of(
+      aclEntry(ACCESS, GROUP, FsAction.ALL),
+      aclEntry(ACCESS, USER, "bar", FsAction.READ_WRITE),
+      aclEntry(ACCESS, USER, "foo", FsAction.READ_EXECUTE),
+      aclEntry(ACCESS, GROUP, "bar", FsAction.READ_WRITE),
+      aclEntry(ACCESS, GROUP, "foo", FsAction.READ_EXECUTE));
+
+  private final ImmutableList<AclEntry> aclFileSpec2 = ImmutableList.of(
+      aclEntry(ACCESS, GROUP, FsAction.ALL),
+      aclEntry(ACCESS, USER, "bar2", FsAction.READ_WRITE),
+      aclEntry(ACCESS, USER, "foo2", FsAction.READ_EXECUTE),
+      aclEntry(ACCESS, GROUP, "bar2", FsAction.READ_WRITE),
       aclEntry(ACCESS, GROUP, "foo2", FsAction.READ_EXECUTE));
 
+  private final FsPermission permFile1 = new FsPermission((short) 0770);
+  private final FsPermission permFile2 = new FsPermission((short) 0770);
+
+  private final ImmutableList<AclEntry> aclInheritedSpec1 = ImmutableList.of(
+      aclEntry(ACCESS, GROUP, FsAction.ALL),
+      aclEntry(ACCESS, USER, "bar", FsAction.READ_WRITE),
+      aclEntry(ACCESS, USER, "foo", FsAction.READ_EXECUTE),
+      aclEntry(ACCESS, GROUP, "bar", FsAction.READ_WRITE),
+      aclEntry(ACCESS, GROUP, "foo", FsAction.READ_EXECUTE),
+      aclEntry(DEFAULT, USER, FsAction.ALL),
+      aclEntry(DEFAULT, GROUP, FsAction.ALL),
+      aclEntry(DEFAULT, OTHER, FsAction.ALL),
+      aclEntry(DEFAULT, MASK, FsAction.ALL),
+      aclEntry(DEFAULT, USER, "bar", FsAction.READ_WRITE),
+      aclEntry(DEFAULT, USER, "foo", FsAction.READ_EXECUTE),
+      aclEntry(DEFAULT, GROUP, "bar", FsAction.READ_WRITE),
+      aclEntry(DEFAULT, GROUP, "foo", FsAction.READ_EXECUTE));
+
+  private final ImmutableList<AclEntry> aclInheritedSpec2 = ImmutableList.of(
+      aclEntry(ACCESS, GROUP, FsAction.ALL),
+      aclEntry(ACCESS, USER, "bar2", FsAction.READ_WRITE),
+      aclEntry(ACCESS, USER, "foo2", FsAction.READ_EXECUTE),
+      aclEntry(ACCESS, GROUP, "bar2", FsAction.READ_WRITE),
+      aclEntry(ACCESS, GROUP, "foo2", FsAction.READ_EXECUTE),
+      aclEntry(DEFAULT, USER, "bar2", FsAction.READ_WRITE),
+      aclEntry(DEFAULT, USER, FsAction.ALL),
+      aclEntry(DEFAULT, GROUP, FsAction.ALL),
+      aclEntry(DEFAULT, OTHER, FsAction.READ_EXECUTE),
+      aclEntry(DEFAULT, MASK, FsAction.ALL),
+      aclEntry(DEFAULT, USER, "foo2", FsAction.READ_EXECUTE),
+      aclEntry(DEFAULT, GROUP, "bar2", FsAction.READ_WRITE),
+      aclEntry(DEFAULT, GROUP, "foo2", FsAction.READ_EXECUTE));
+
+  private final FsPermission permInherited1 = new FsPermission((short) 0770);
+  private final FsPermission permInherited2 = new FsPermission((short) 0770);
+
   @Override
   public void setPermission(String locn, int permIndex) throws Exception {
+    Path path = new Path(locn);
+    FileStatus fstat = fs.getFileStatus(path);
+
     switch (permIndex) {
       case 0:
-        setAcl(locn, aclSpec1);
+        fs.setAcl(path, fstat.isDir() ? aclsForDirs1 : aclsForFiles1);
         break;
       case 1:
-        setAcl(locn, aclSpec2);
+        fs.setAcl(path, fstat.isDir() ? aclsForDirs2 : aclsForFiles2);
         break;
       default:
         throw new RuntimeException("Only 2 permissions by this test");
@@ -83,20 +210,50 @@ public class TestExtendedAcls extends FolderPermissionBase 
{
 
   @Override
   public void verifyPermission(String locn, int permIndex) throws Exception {
+    Path path = new Path(locn);
+    FileStatus fstat = fs.getFileStatus(path);
+    FsPermission perm = fstat.getPermission();
+    List<AclEntry> actual = null;
+
+    switch (permIndex) {
+      case 0:
+        actual = fs.getAclStatus(path).getEntries();
+        verify(path, perm1, perm, aclDirSpec1, actual);
+        break;
+      case 1:
+        actual = fs.getAclStatus(path).getEntries();
+        verify(path, perm2, perm, aclDirSpec2, actual);
+        break;
+      default:
+        throw new RuntimeException("Only 2 permissions by this test: " + 
permIndex);
+    }
+  }
+
+  @Override
+  public void verifyInheritedPermission(String locn, int permIndex) throws 
Exception {
+    Path path = new Path(locn);
+    FileStatus fstat = fs.getFileStatus(path);
+    FsPermission perm = fstat.getPermission();
+    List<AclEntry> acls = fs.getAclStatus(path).getEntries();
+    FsPermission expectedPerm = null;
+    List<AclEntry> expectedAcls = null;
+
     switch (permIndex) {
       case 0:
-        FsPermission perm = fs.getFileStatus(new Path(locn)).getPermission();
-        Assert.assertEquals("Location: " + locn, "rwxrwxrwx", 
String.valueOf(perm));
+        //Assert.assertEquals("Location: " + locn, "rwxrwxrwx", 
String.valueOf(perm));
+
+        expectedPerm = fstat.isFile() ? permFile1 : permInherited1;
+        expectedAcls = fstat.isFile() ? aclFileSpec1 : aclInheritedSpec1;
 
-        List<AclEntry> actual = getAcl(locn);
-        verifyAcls(aclSpec1, actual);
+        verify(path, expectedPerm, perm, expectedAcls, acls);
         break;
       case 1:
-        perm = fs.getFileStatus(new Path(locn)).getPermission();
-        Assert.assertEquals("Location: " + locn, "rwxrwxr-x", 
String.valueOf(perm));
+        //Assert.assertEquals("Location: " + locn, "rwxrwxr-x", 
String.valueOf(perm));
 
-        List<AclEntry> acls = getAcl(locn);
-        verifyAcls(aclSpec2, acls);
+        expectedPerm = fstat.isFile() ? permFile2 : permInherited2;
+        expectedAcls = fstat.isFile() ? aclFileSpec2 : aclInheritedSpec2;
+
+        verify(path, expectedPerm, perm, expectedAcls, acls);
         break;
       default:
         throw new RuntimeException("Only 2 permissions by this test: " + 
permIndex);
@@ -139,25 +296,42 @@ public class TestExtendedAcls extends 
FolderPermissionBase {
         .setPermission(permission).build();
   }
 
-  private void verifyAcls(List<AclEntry> expectedList, List<AclEntry> 
actualList) {
-    for (AclEntry expected : expectedList) {
-      if (expected.getName() != null) {
-        //the non-named acl's are coming as regular permission, and not as 
aclEntries.
-        boolean found = false;
-        for (AclEntry actual : actualList) {
-          if (actual.equals(expected)) {
-            found = true;
-          }
-        }
-        if (!found) {
-          Assert.fail("Following Acl does not have a match: " + expected);
+  private void verify(Path path, FsPermission expectedPerm, FsPermission 
actualPerm,
+      List<AclEntry> expectedAcls, List<AclEntry> actualAcls) {
+    verifyPermissions(path, expectedPerm, actualPerm);
+    verifyAcls(path, expectedAcls, actualAcls);
+  }
+
+  private void verifyPermissions(Path path, FsPermission expected, 
FsPermission actual) {
+    LOG.debug("Verify permissions on " + path + " expected=" + expected + " 
actual=" + actual);
+
+    Assert.assertTrue("User permissions on " + path + " differ: expected=" + 
expected + " actual=" + actual,
+        expected.getUserAction() == actual.getUserAction());
+    Assert.assertTrue("Group/Mask permissions on " + path + " differ: 
expected=" + expected + " actual=" + actual,
+        expected.getGroupAction() == actual.getGroupAction());
+    Assert.assertTrue("Other permissions on " + path + " differ: expected=" + 
expected + " actual=" + actual,
+        expected.getOtherAction() == actual.getOtherAction());
+  }
+
+  private void verifyAcls(Path path, List<AclEntry> expected, List<AclEntry> 
actual) {
+    LOG.debug("Verify ACLs on " + path + " expected=" + expected + " actual=" 
+ actual);
+
+    ArrayList<AclEntry> acls = new ArrayList<AclEntry>(actual);
+
+    for (AclEntry expectedAcl : expected) {
+      boolean found = false;
+      for (AclEntry acl : acls) {
+        if (acl.equals(expectedAcl)) {
+          acls.remove(acl);
+          found = true;
+          break;
         }
       }
+
+      Assert.assertTrue("ACLs on " + path + " differ: " + expectedAcl + " 
expected=" + expected + " actual=" + actual, found);
     }
-  }
 
-  private void setAcl(String locn, List<AclEntry> aclSpec) throws Exception {
-    fs.setAcl(new Path(locn), aclSpec);
+    Assert.assertTrue("ACLs on " + path + " are more than expected: expected=" 
+ expected + " actual=" + actual, acls.size() == 0);
   }
 
   private List<AclEntry> getAcl(String locn) throws Exception {

Reply via email to