This is an automated email from the ASF dual-hosted git repository.

meiyi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hbase.git


The following commit(s) were added to refs/heads/master by this push:
     new 7294af8  HBASE-22580 Add a table attribute to make user scan snapshot 
feature configurable for table (#336)
7294af8 is described below

commit 7294af8b1514d8bf8a2ddb8211be310b9c0e8f5b
Author: meiyi <myime...@gmail.com>
AuthorDate: Mon Jul 29 09:51:26 2019 +0800

    HBASE-22580 Add a table attribute to make user scan snapshot feature 
configurable for table (#336)
---
 .../hbase/security/access/PermissionStorage.java   |   5 +
 .../access/SnapshotScannerHDFSAclController.java   | 406 +++++++++++++--------
 .../access/SnapshotScannerHDFSAclHelper.java       | 187 ++++++----
 .../TestSnapshotScannerHDFSAclController.java      | 171 +++++++--
 4 files changed, 523 insertions(+), 246 deletions(-)

diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/security/access/PermissionStorage.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/security/access/PermissionStorage.java
index 6d37fe5..2a9b195 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/security/access/PermissionStorage.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/security/access/PermissionStorage.java
@@ -508,6 +508,11 @@ public final class PermissionStorage {
       false);
   }
 
+  public static ListMultimap<String, UserPermission> 
getGlobalPermissions(Configuration conf)
+      throws IOException {
+    return getPermissions(conf, null, null, null, null, null, false);
+  }
+
   /**
    * Reads user permission assignments stored in the <code>l:</code> column 
family of the first
    * table row in <code>_acl_</code>.
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/security/access/SnapshotScannerHDFSAclController.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/security/access/SnapshotScannerHDFSAclController.java
index c964b67..f6d5b76 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/security/access/SnapshotScannerHDFSAclController.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/security/access/SnapshotScannerHDFSAclController.java
@@ -28,7 +28,6 @@ import java.util.Set;
 import java.util.stream.Collectors;
 
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hbase.Cell;
 import org.apache.hadoop.hbase.CellUtil;
@@ -38,6 +37,7 @@ import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.TableNotFoundException;
 import org.apache.hadoop.hbase.client.Admin;
 import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
+import org.apache.hadoop.hbase.client.Connection;
 import org.apache.hadoop.hbase.client.Delete;
 import org.apache.hadoop.hbase.client.Get;
 import org.apache.hadoop.hbase.client.Put;
@@ -58,7 +58,6 @@ import org.apache.hadoop.hbase.coprocessor.ObserverContext;
 import org.apache.hadoop.hbase.master.MasterServices;
 import org.apache.hadoop.hbase.security.User;
 import org.apache.hadoop.hbase.security.UserProvider;
-import org.apache.hadoop.hbase.security.access.Permission.Action;
 import 
org.apache.hadoop.hbase.security.access.SnapshotScannerHDFSAclHelper.PathHelper;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.util.Pair;
@@ -66,12 +65,15 @@ import org.apache.yetus.audience.InterfaceAudience;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import 
org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
+import org.apache.hbase.thirdparty.com.google.common.collect.Sets;
+
 /**
  * Set HDFS ACLs to hFiles to make HBase granted users have permission to scan 
snapshot
  * <p>
  * To use this feature, please mask sure HDFS config:
  * <ul>
- * <li>dfs.permissions.enabled = true</li>
+ * <li>dfs.namenode.acls.enabled = true</li>
  * <li>fs.permissions.umask-mode = 027 (or smaller umask than 027)</li>
  * </ul>
  * </p>
@@ -102,8 +104,9 @@ public class SnapshotScannerHDFSAclController implements 
MasterCoprocessor, Mast
 
   private SnapshotScannerHDFSAclHelper hdfsAclHelper = null;
   private PathHelper pathHelper = null;
-  private FileSystem fs = null;
+  private MasterServices masterServices = null;
   private volatile boolean initialized = false;
+  private volatile boolean aclTableInitialized = false;
   /** Provider for mapping principal names to Users */
   private UserProvider userProvider;
 
@@ -121,11 +124,10 @@ public class SnapshotScannerHDFSAclController implements 
MasterCoprocessor, Mast
       if (!(mEnv instanceof HasMasterServices)) {
         throw new IOException("Does not implement HMasterServices");
       }
-      MasterServices masterServices = ((HasMasterServices) 
mEnv).getMasterServices();
+      masterServices = ((HasMasterServices) mEnv).getMasterServices();
       hdfsAclHelper = new 
SnapshotScannerHDFSAclHelper(masterServices.getConfiguration(),
           masterServices.getConnection());
       pathHelper = hdfsAclHelper.getPathHelper();
-      fs = pathHelper.getFileSystem();
       hdfsAclHelper.setCommonDirectoryPermission();
       initialized = true;
       userProvider = 
UserProvider.instantiate(c.getEnvironment().getConfiguration());
@@ -138,32 +140,33 @@ public class SnapshotScannerHDFSAclController implements 
MasterCoprocessor, Mast
 
   @Override
   public void postStartMaster(ObserverContext<MasterCoprocessorEnvironment> c) 
throws IOException {
-    if (checkInitialized()) {
-      try (Admin admin = c.getEnvironment().getConnection().getAdmin()) {
-        if (admin.tableExists(PermissionStorage.ACL_TABLE_NAME)) {
-          // Check if hbase acl table has 'm' CF, if not, add 'm' CF
-          TableDescriptor tableDescriptor = 
admin.getDescriptor(PermissionStorage.ACL_TABLE_NAME);
-          boolean containHdfsAclFamily =
-              
Arrays.stream(tableDescriptor.getColumnFamilies()).anyMatch(family -> Bytes
-                  .equals(family.getName(), 
SnapshotScannerHDFSAclStorage.HDFS_ACL_FAMILY));
-          if (!containHdfsAclFamily) {
-            TableDescriptorBuilder builder = 
TableDescriptorBuilder.newBuilder(tableDescriptor)
-                .setColumnFamily(ColumnFamilyDescriptorBuilder
-                    
.newBuilder(SnapshotScannerHDFSAclStorage.HDFS_ACL_FAMILY).build());
-            admin.modifyTable(builder.build());
-          }
-        } else {
-          throw new TableNotFoundException("Table " + 
PermissionStorage.ACL_TABLE_NAME
-              + " is not created yet. Please check if " + getClass().getName()
-              + " is configured after " + AccessController.class.getName());
+    if (!initialized) {
+      return;
+    }
+    try (Admin admin = c.getEnvironment().getConnection().getAdmin()) {
+      if (admin.tableExists(PermissionStorage.ACL_TABLE_NAME)) {
+        // Check if acl table has 'm' CF, if not, add 'm' CF
+        TableDescriptor tableDescriptor = 
admin.getDescriptor(PermissionStorage.ACL_TABLE_NAME);
+        boolean containHdfsAclFamily = 
Arrays.stream(tableDescriptor.getColumnFamilies()).anyMatch(
+          family -> Bytes.equals(family.getName(), 
SnapshotScannerHDFSAclStorage.HDFS_ACL_FAMILY));
+        if (!containHdfsAclFamily) {
+          TableDescriptorBuilder builder = 
TableDescriptorBuilder.newBuilder(tableDescriptor)
+              .setColumnFamily(ColumnFamilyDescriptorBuilder
+                  
.newBuilder(SnapshotScannerHDFSAclStorage.HDFS_ACL_FAMILY).build());
+          admin.modifyTable(builder.build());
         }
+        aclTableInitialized = true;
+      } else {
+        throw new TableNotFoundException("Table " + 
PermissionStorage.ACL_TABLE_NAME
+            + " is not created yet. Please check if " + getClass().getName()
+            + " is configured after " + AccessController.class.getName());
       }
     }
   }
 
   @Override
   public void preStopMaster(ObserverContext<MasterCoprocessorEnvironment> c) {
-    if (checkInitialized()) {
+    if (initialized) {
       hdfsAclHelper.close();
     }
   }
@@ -171,34 +174,28 @@ public class SnapshotScannerHDFSAclController implements 
MasterCoprocessor, Mast
   @Override
   public void 
postCompletedCreateTableAction(ObserverContext<MasterCoprocessorEnvironment> c,
       TableDescriptor desc, RegionInfo[] regions) throws IOException {
-    if (!desc.getTableName().isSystemTable() && checkInitialized()) {
+    if (needHandleTableHdfsAcl(desc, "createTable " + desc.getTableName())) {
       TableName tableName = desc.getTableName();
-      List<Path> paths = hdfsAclHelper.getTableRootPaths(tableName, false);
-      for (Path path : paths) {
-        if (!fs.exists(path)) {
-          fs.mkdirs(path);
-        }
-      }
-      // Add table owner HDFS acls
+      // 1. Create table directories to make HDFS acls can be inherited
+      hdfsAclHelper.createTableDirectories(tableName);
+      // 2. Add table owner HDFS acls
       String owner =
           desc.getOwnerString() == null ? getActiveUser(c).getShortName() : 
desc.getOwnerString();
-      hdfsAclHelper.addTableAcl(tableName, owner);
-      try (Table aclTable =
-          
c.getEnvironment().getConnection().getTable(PermissionStorage.ACL_TABLE_NAME)) {
-        SnapshotScannerHDFSAclStorage.addUserTableHdfsAcl(aclTable, owner, 
tableName);
-      }
+      hdfsAclHelper.addTableAcl(tableName, Sets.newHashSet(owner), "create");
+      // 3. Record table owner permission is synced to HDFS in acl table
+      
SnapshotScannerHDFSAclStorage.addUserTableHdfsAcl(c.getEnvironment().getConnection(),
 owner,
+        tableName);
     }
   }
 
   @Override
   public void 
postCreateNamespace(ObserverContext<MasterCoprocessorEnvironment> c,
       NamespaceDescriptor ns) throws IOException {
-    if (checkInitialized()) {
+    if (checkInitialized("createNamespace " + ns.getName())) {
+      // Create namespace directories to make HDFS acls can be inherited
       List<Path> paths = hdfsAclHelper.getNamespaceRootPaths(ns.getName());
       for (Path path : paths) {
-        if (!fs.exists(path)) {
-          fs.mkdirs(path);
-        }
+        hdfsAclHelper.createDirIfNotExist(path);
       }
     }
   }
@@ -206,7 +203,8 @@ public class SnapshotScannerHDFSAclController implements 
MasterCoprocessor, Mast
   @Override
   public void 
postCompletedSnapshotAction(ObserverContext<MasterCoprocessorEnvironment> c,
       SnapshotDescription snapshot, TableDescriptor tableDescriptor) throws 
IOException {
-    if (!tableDescriptor.getTableName().isSystemTable() && checkInitialized()) 
{
+    if (needHandleTableHdfsAcl(tableDescriptor, "snapshot " + 
snapshot.getName())) {
+      // Add HDFS acls of users with table read permission to snapshot files
       hdfsAclHelper.snapshotAcl(snapshot);
     }
   }
@@ -214,51 +212,76 @@ public class SnapshotScannerHDFSAclController implements 
MasterCoprocessor, Mast
   @Override
   public void 
postCompletedTruncateTableAction(ObserverContext<MasterCoprocessorEnvironment> 
c,
       TableName tableName) throws IOException {
-    if (!tableName.isSystemTable() && checkInitialized()) {
-      hdfsAclHelper.resetTableAcl(tableName);
+    if (needHandleTableHdfsAcl(tableName, "truncateTable " + tableName)) {
+      // Since the table directories is recreated, so add HDFS acls again
+      Set<String> users = hdfsAclHelper.getUsersWithTableReadAction(tableName, 
false, false);
+      hdfsAclHelper.addTableAcl(tableName, users, "truncate");
     }
   }
 
   @Override
   public void postDeleteTable(ObserverContext<MasterCoprocessorEnvironment> 
ctx,
       TableName tableName) throws IOException {
-    if (!tableName.isSystemTable() && checkInitialized()) {
+    if (needHandleTableHdfsAcl(tableName, "deleteTable " + tableName)) {
       /*
-       * remove table user access HDFS acl from namespace directory if the 
user has no permissions
+       * Remove table user access HDFS acl from namespace directory if the 
user has no permissions
        * of global, ns of the table or other tables of the ns, eg: Bob has 
'ns1:t1' read permission,
        * when delete 'ns1:t1', if Bob has global read permission, '@ns1' read 
permission or
        * 'ns1:other_tables' read permission, then skip remove Bob access acl 
in ns1Dirs, otherwise,
        * remove Bob access acl.
        */
-      Set<String> removeUsers = new HashSet<>();
       try (Table aclTable =
           
ctx.getEnvironment().getConnection().getTable(PermissionStorage.ACL_TABLE_NAME))
 {
-        List<String> users = 
SnapshotScannerHDFSAclStorage.getTableUsers(aclTable, tableName);
+        Set<String> users = 
SnapshotScannerHDFSAclStorage.getTableUsers(aclTable, tableName);
+        // 1. Delete table owner permission is synced to HDFS in acl table
         SnapshotScannerHDFSAclStorage.deleteTableHdfsAcl(aclTable, tableName);
-        byte[] namespace = tableName.getNamespace();
-        for (String user : users) {
-          List<byte[]> userEntries = 
SnapshotScannerHDFSAclStorage.getUserEntries(aclTable, user);
-          boolean remove = true;
-          for (byte[] entry : userEntries) {
-            if (PermissionStorage.isGlobalEntry(entry)) {
-              remove = false;
-              break;
-            } else if (PermissionStorage.isNamespaceEntry(entry)
-                && Bytes.equals(PermissionStorage.fromNamespaceEntry(entry), 
namespace)) {
-              remove = false;
-              break;
-            } else if (Bytes.equals(TableName.valueOf(entry).getNamespace(), 
namespace)) {
-              remove = false;
-              break;
-            }
-          }
-          if (remove) {
-            removeUsers.add(user);
-          }
+        // 2. Remove namespace access acls
+        Set<String> removeUsers = filterUsersToRemoveNsAccessAcl(aclTable, 
tableName, users);
+        if (removeUsers.size() > 0) {
+          hdfsAclHelper.removeNamespaceAccessAcl(tableName, removeUsers, 
"delete");
         }
       }
-      if (removeUsers.size() > 0) {
-        hdfsAclHelper.removeNamespaceAcl(tableName, removeUsers);
+    }
+  }
+
+  @Override
+  public void postModifyTable(ObserverContext<MasterCoprocessorEnvironment> 
ctx,
+      TableName tableName, TableDescriptor oldDescriptor, TableDescriptor 
currentDescriptor)
+      throws IOException {
+    try (Table aclTable =
+        
ctx.getEnvironment().getConnection().getTable(PermissionStorage.ACL_TABLE_NAME))
 {
+      if (needHandleTableHdfsAcl(currentDescriptor, "modifyTable " + tableName)
+          && !hdfsAclHelper.isTableUserScanSnapshotEnabled(oldDescriptor)) {
+        // 1. Create table directories used for acl inherited
+        hdfsAclHelper.createTableDirectories(tableName);
+        // 2. Add table users HDFS acls
+        Set<String> tableUsers = 
hdfsAclHelper.getUsersWithTableReadAction(tableName, false, false);
+        Set<String> users =
+            
hdfsAclHelper.getUsersWithNamespaceReadAction(tableName.getNamespaceAsString(), 
true);
+        users.addAll(tableUsers);
+        hdfsAclHelper.addTableAcl(tableName, users, "modify");
+        // 3. Record table user acls are synced to HDFS in acl table
+        
SnapshotScannerHDFSAclStorage.addUserTableHdfsAcl(ctx.getEnvironment().getConnection(),
+          tableUsers, tableName);
+      } else if (needHandleTableHdfsAcl(oldDescriptor, "modifyTable " + 
tableName)
+          && !hdfsAclHelper.isTableUserScanSnapshotEnabled(currentDescriptor)) 
{
+        // 1. Remove empty table directories
+        List<Path> tableRootPaths = hdfsAclHelper.getTableRootPaths(tableName, 
false);
+        for (Path path : tableRootPaths) {
+          hdfsAclHelper.deleteEmptyDir(path);
+        }
+        // 2. Remove all table HDFS acls
+        Set<String> tableUsers = 
hdfsAclHelper.getUsersWithTableReadAction(tableName, false, false);
+        Set<String> users = hdfsAclHelper
+            .getUsersWithNamespaceReadAction(tableName.getNamespaceAsString(), 
true);
+        users.addAll(tableUsers);
+        hdfsAclHelper.removeTableAcl(tableName, users);
+        // 3. Remove namespace access HDFS acls for users who only own 
permission for this table
+        hdfsAclHelper.removeNamespaceAccessAcl(tableName,
+          filterUsersToRemoveNsAccessAcl(aclTable, tableName, tableUsers), 
"modify");
+        // 4. Record table user acl is not synced to HDFS
+        
SnapshotScannerHDFSAclStorage.deleteUserTableHdfsAcl(ctx.getEnvironment().getConnection(),
+          tableUsers, tableName);
       }
     }
   }
@@ -266,33 +289,26 @@ public class SnapshotScannerHDFSAclController implements 
MasterCoprocessor, Mast
   @Override
   public void 
postDeleteNamespace(ObserverContext<MasterCoprocessorEnvironment> ctx,
       String namespace) throws IOException {
-    if (checkInitialized()) {
-      try (Table aclTable =
-          
ctx.getEnvironment().getConnection().getTable(PermissionStorage.ACL_TABLE_NAME))
 {
-        SnapshotScannerHDFSAclStorage.deleteNamespaceHdfsAcl(aclTable, 
namespace);
-      }
+    if (checkInitialized("deleteNamespace " + namespace)) {
+      // 1. Record namespace user acl is not synced to HDFS
+      
SnapshotScannerHDFSAclStorage.deleteNamespaceHdfsAcl(ctx.getEnvironment().getConnection(),
+        namespace);
+      // 2. Delete tmp namespace directory
       /**
        * Delete namespace tmp directory because it's created by this 
coprocessor when namespace is
        * created to make namespace default acl can be inherited by tables. The 
namespace data
        * directory is deleted by DeleteNamespaceProcedure, the namespace 
archive directory is
        * deleted by HFileCleaner.
        */
-      Path tmpNsDir = pathHelper.getTmpNsDir(namespace);
-      if (fs.exists(tmpNsDir)) {
-        if (fs.listStatus(tmpNsDir).length == 0) {
-          fs.delete(tmpNsDir, false);
-        } else {
-          LOG.error("The tmp directory {} of namespace {} is not empty after 
delete namespace",
-            tmpNsDir, namespace);
-        }
-      }
+      hdfsAclHelper.deleteEmptyDir(pathHelper.getTmpNsDir(namespace));
     }
   }
 
   @Override
   public void postGrant(ObserverContext<MasterCoprocessorEnvironment> c,
       UserPermission userPermission, boolean mergeExistingPermissions) throws 
IOException {
-    if (!checkInitialized()) {
+    if (!checkInitialized(
+      "grant " + userPermission + ", merge existing permissions " + 
mergeExistingPermissions)) {
       return;
     }
     try (Table aclTable =
@@ -302,15 +318,18 @@ public class SnapshotScannerHDFSAclController implements 
MasterCoprocessor, Mast
       switch (userPermission.getAccessScope()) {
         case GLOBAL:
           UserPermission perm = getUserGlobalPermission(conf, userName);
-          if (perm != null && containReadPermission(perm)) {
+          if (perm != null && hdfsAclHelper.containReadAction(perm)) {
             if (!isHdfsAclSet(aclTable, userName)) {
-              Pair<Set<String>, Set<TableName>> namespaceAndTable =
-                      
SnapshotScannerHDFSAclStorage.getUserNamespaceAndTable(aclTable, userName);
-              Set<String> skipNamespaces = namespaceAndTable.getFirst();
-              Set<TableName> skipTables = 
namespaceAndTable.getSecond().stream()
-                      .filter(t -> 
!skipNamespaces.contains(t.getNamespaceAsString()))
-                      .collect(Collectors.toSet());
+              // 1. Get namespaces and tables which global user acls are 
already synced
+              Pair<Set<String>, Set<TableName>> skipNamespaceAndTables =
+                  
SnapshotScannerHDFSAclStorage.getUserNamespaceAndTable(aclTable, userName);
+              Set<String> skipNamespaces = skipNamespaceAndTables.getFirst();
+              Set<TableName> skipTables = 
skipNamespaceAndTables.getSecond().stream()
+                  .filter(t -> 
!skipNamespaces.contains(t.getNamespaceAsString()))
+                  .collect(Collectors.toSet());
+              // 2. Add HDFS acl(skip namespaces and tables directories whose 
acl is set)
               hdfsAclHelper.grantAcl(userPermission, skipNamespaces, 
skipTables);
+              // 3. Record global acl is sync to HDFS
               SnapshotScannerHDFSAclStorage.addUserGlobalHdfsAcl(aclTable, 
userName);
             }
           } else {
@@ -322,12 +341,15 @@ public class SnapshotScannerHDFSAclController implements 
MasterCoprocessor, Mast
         case NAMESPACE:
           String namespace = ((NamespacePermission) 
userPermission.getPermission()).getNamespace();
           UserPermission nsPerm = getUserNamespacePermission(conf, userName, 
namespace);
-          if (nsPerm != null && containReadPermission(nsPerm)) {
+          if (nsPerm != null && hdfsAclHelper.containReadAction(nsPerm)) {
             if (!isHdfsAclSet(aclTable, userName, namespace)) {
+              // 1. Get tables which namespace user acls are already synced
               Set<TableName> skipTables = SnapshotScannerHDFSAclStorage
-                      .getUserNamespaceAndTable(aclTable, 
userName).getSecond();
+                  .getUserNamespaceAndTable(aclTable, userName).getSecond();
+              // 2. Add HDFS acl(skip tables directories whose acl is set)
               hdfsAclHelper.grantAcl(userPermission, new HashSet<>(0), 
skipTables);
             }
+            // 3. Record namespace acl is synced to HDFS
             SnapshotScannerHDFSAclStorage.addUserNamespaceHdfsAcl(aclTable, 
userName, namespace);
           } else {
             // The merged user permission doesn't contain READ, so remove user 
namespace HDFS acls
@@ -336,24 +358,23 @@ public class SnapshotScannerHDFSAclController implements 
MasterCoprocessor, Mast
           }
           break;
         case TABLE:
-          TableName tableName = ((TablePermission) 
userPermission.getPermission()).getTableName();
-          UserPermission tPerm = getUserTablePermission(conf, userName, 
tableName);
-          if (tPerm != null) {
-            TablePermission tablePermission = (TablePermission) 
tPerm.getPermission();
-            if (tablePermission.hasFamily() || tablePermission.hasQualifier()) 
{
-              break;
+          TablePermission tablePerm = (TablePermission) 
userPermission.getPermission();
+          if (needHandleTableHdfsAcl(tablePerm)) {
+            TableName tableName = tablePerm.getTableName();
+            UserPermission tPerm = getUserTablePermission(conf, userName, 
tableName);
+            if (tPerm != null && hdfsAclHelper.containReadAction(tPerm)) {
+              if (!isHdfsAclSet(aclTable, userName, tableName)) {
+                // 1. Add HDFS acl
+                hdfsAclHelper.grantAcl(userPermission, new HashSet<>(0), new 
HashSet<>(0));
+              }
+              // 2. Record table acl is synced to HDFS
+              SnapshotScannerHDFSAclStorage.addUserTableHdfsAcl(aclTable, 
userName, tableName);
+            } else {
+              // The merged user permission doesn't contain READ, so remove 
user table HDFS acls if
+              // it's set
+              removeUserTableHdfsAcl(aclTable, userName, tableName, 
userPermission);
             }
           }
-          if (tPerm != null && containReadPermission(tPerm)) {
-            if (!isHdfsAclSet(aclTable, userName, tableName)) {
-              hdfsAclHelper.grantAcl(userPermission, new HashSet<>(0), new 
HashSet<>(0));
-            }
-            SnapshotScannerHDFSAclStorage.addUserTableHdfsAcl(aclTable, 
userName, tableName);
-          } else {
-            // The merged user permission doesn't contain READ, so remove user 
table HDFS acls if
-            // it's set
-            removeUserTableHdfsAcl(aclTable, userName, tableName, 
userPermission);
-          }
           break;
         default:
           throw new IllegalArgumentException(
@@ -365,7 +386,7 @@ public class SnapshotScannerHDFSAclController implements 
MasterCoprocessor, Mast
   @Override
   public void postRevoke(ObserverContext<MasterCoprocessorEnvironment> c,
       UserPermission userPermission) throws IOException {
-    if (checkInitialized()) {
+    if (checkInitialized("revoke " + userPermission)) {
       try (Table aclTable =
           
c.getEnvironment().getConnection().getTable(PermissionStorage.ACL_TABLE_NAME)) {
         String userName = userPermission.getUser();
@@ -373,7 +394,7 @@ public class SnapshotScannerHDFSAclController implements 
MasterCoprocessor, Mast
         switch (userPermission.getAccessScope()) {
           case GLOBAL:
             UserPermission userGlobalPerm = getUserGlobalPermission(conf, 
userName);
-            if (userGlobalPerm == null || 
!containReadPermission(userGlobalPerm)) {
+            if (userGlobalPerm == null || 
!hdfsAclHelper.containReadAction(userGlobalPerm)) {
               removeUserGlobalHdfsAcl(aclTable, userName, userPermission);
             }
             break;
@@ -381,16 +402,18 @@ public class SnapshotScannerHDFSAclController implements 
MasterCoprocessor, Mast
             NamespacePermission nsPerm = (NamespacePermission) 
userPermission.getPermission();
             UserPermission userNsPerm =
                 getUserNamespacePermission(conf, userName, 
nsPerm.getNamespace());
-            if (userNsPerm == null || !containReadPermission(userNsPerm)) {
+            if (userNsPerm == null || 
!hdfsAclHelper.containReadAction(userNsPerm)) {
               removeUserNamespaceHdfsAcl(aclTable, userName, 
nsPerm.getNamespace(), userPermission);
             }
             break;
           case TABLE:
             TablePermission tPerm = (TablePermission) 
userPermission.getPermission();
-            UserPermission userTablePerm =
-                getUserTablePermission(conf, userName, tPerm.getTableName());
-            if (userTablePerm == null || 
!containReadPermission(userTablePerm)) {
-              removeUserTableHdfsAcl(aclTable, userName, tPerm.getTableName(), 
userPermission);
+            if (needHandleTableHdfsAcl(tPerm)) {
+              TableName tableName = tPerm.getTableName();
+              UserPermission userTablePerm = getUserTablePermission(conf, 
userName, tableName);
+              if (userTablePerm == null || 
!hdfsAclHelper.containReadAction(userTablePerm)) {
+                removeUserTableHdfsAcl(aclTable, userName, tableName, 
userPermission);
+              }
             }
             break;
           default:
@@ -404,27 +427,32 @@ public class SnapshotScannerHDFSAclController implements 
MasterCoprocessor, Mast
   private void removeUserGlobalHdfsAcl(Table aclTable, String userName,
       UserPermission userPermission) throws IOException {
     if (SnapshotScannerHDFSAclStorage.hasUserGlobalHdfsAcl(aclTable, 
userName)) {
-      // remove user global acls but reserve ns and table acls
+      // 1. Get namespaces and tables which global user acls are already synced
       Pair<Set<String>, Set<TableName>> namespaceAndTable =
           SnapshotScannerHDFSAclStorage.getUserNamespaceAndTable(aclTable, 
userName);
       Set<String> skipNamespaces = namespaceAndTable.getFirst();
       Set<TableName> skipTables = namespaceAndTable.getSecond().stream()
           .filter(t -> !skipNamespaces.contains(t.getNamespaceAsString()))
           .collect(Collectors.toSet());
+      // 2. Remove user HDFS acls(skip namespaces and tables directories
+      // whose acl must be reversed)
       hdfsAclHelper.revokeAcl(userPermission, skipNamespaces, skipTables);
+      // 3. Remove global user acl is synced to HDFS in acl table
       SnapshotScannerHDFSAclStorage.deleteUserGlobalHdfsAcl(aclTable, 
userName);
     }
   }
 
   private void removeUserNamespaceHdfsAcl(Table aclTable, String userName, 
String namespace,
       UserPermission userPermission) throws IOException {
-    // remove user ns acls but reserve table acls
     if (SnapshotScannerHDFSAclStorage.hasUserNamespaceHdfsAcl(aclTable, 
userName, namespace)) {
       if (!SnapshotScannerHDFSAclStorage.hasUserGlobalHdfsAcl(aclTable, 
userName)) {
+        // 1. Get tables whose namespace user acls are already synced
         Set<TableName> skipTables =
             SnapshotScannerHDFSAclStorage.getUserNamespaceAndTable(aclTable, 
userName).getSecond();
+        // 2. Remove user HDFS acls(skip tables directories whose acl must be 
reversed)
         hdfsAclHelper.revokeAcl(userPermission, new HashSet<>(), skipTables);
       }
+      // 3. Remove namespace user acl is synced to HDFS in acl table
       SnapshotScannerHDFSAclStorage.deleteUserNamespaceHdfsAcl(aclTable, 
userName, namespace);
     }
   }
@@ -435,49 +463,36 @@ public class SnapshotScannerHDFSAclController implements 
MasterCoprocessor, Mast
       if (!SnapshotScannerHDFSAclStorage.hasUserGlobalHdfsAcl(aclTable, 
userName)
           && !SnapshotScannerHDFSAclStorage.hasUserNamespaceHdfsAcl(aclTable, 
userName,
             tableName.getNamespaceAsString())) {
-        // remove table acls
+        // 1. Remove table acls
         hdfsAclHelper.revokeAcl(userPermission, new HashSet<>(0), new 
HashSet<>(0));
       }
+      // 2. Remove table user acl is synced to HDFS in acl table
       SnapshotScannerHDFSAclStorage.deleteUserTableHdfsAcl(aclTable, userName, 
tableName);
     }
   }
 
-  private boolean containReadPermission(UserPermission userPermission) {
-    if (userPermission != null) {
-      return Arrays.stream(userPermission.getPermission().getActions())
-          .anyMatch(action -> action == Action.READ);
-    }
-    return false;
-  }
-
   private UserPermission getUserGlobalPermission(Configuration conf, String 
userName)
       throws IOException {
     List<UserPermission> permissions = 
PermissionStorage.getUserPermissions(conf,
       PermissionStorage.ACL_GLOBAL_NAME, null, null, userName, true);
-    if (permissions != null && permissions.size() > 0) {
-      return permissions.get(0);
-    }
-    return null;
+    return permissions.size() > 0 ? permissions.get(0) : null;
   }
 
   private UserPermission getUserNamespacePermission(Configuration conf, String 
userName,
       String namespace) throws IOException {
     List<UserPermission> permissions =
         PermissionStorage.getUserNamespacePermissions(conf, namespace, 
userName, true);
-    if (permissions != null && permissions.size() > 0) {
-      return permissions.get(0);
-    }
-    return null;
+    return permissions.size() > 0 ? permissions.get(0) : null;
   }
 
   private UserPermission getUserTablePermission(Configuration conf, String 
userName,
       TableName tableName) throws IOException {
-    List<UserPermission> permissions =
-        PermissionStorage.getUserTablePermissions(conf, tableName, null, null, 
userName, true);
-    if (permissions != null && permissions.size() > 0) {
-      return permissions.get(0);
-    }
-    return null;
+    List<UserPermission> permissions = PermissionStorage
+        .getUserTablePermissions(conf, tableName, null, null, userName, 
true).stream()
+        .filter(userPermission -> hdfsAclHelper
+            .isNotFamilyOrQualifierPermission((TablePermission) 
userPermission.getPermission()))
+        .collect(Collectors.toList());
+    return permissions.size() > 0 ? permissions.get(0) : null;
   }
 
   private boolean isHdfsAclSet(Table aclTable, String userName) throws 
IOException {
@@ -495,7 +510,7 @@ public class SnapshotScannerHDFSAclController implements 
MasterCoprocessor, Mast
   }
 
   /**
-   * Check if user global/namespace/table HDFS acls is already set to hfile
+   * Check if user global/namespace/table HDFS acls is already set
    */
   private boolean isHdfsAclSet(Table aclTable, String userName, String 
namespace,
       TableName tableName) throws IOException {
@@ -513,12 +528,32 @@ public class SnapshotScannerHDFSAclController implements 
MasterCoprocessor, Mast
     return isSet;
   }
 
-  private boolean checkInitialized() {
+  @VisibleForTesting
+  boolean checkInitialized(String operation) {
     if (initialized) {
-      return true;
-    } else {
-      return false;
+      if (aclTableInitialized) {
+        return true;
+      } else {
+        LOG.warn("Skip set HDFS acls because acl table is not initialized when 
" + operation);
+      }
     }
+    return false;
+  }
+
+  private boolean needHandleTableHdfsAcl(TablePermission tablePermission) 
throws IOException {
+    return needHandleTableHdfsAcl(tablePermission.getTableName(), "")
+        && hdfsAclHelper.isNotFamilyOrQualifierPermission(tablePermission);
+  }
+
+  private boolean needHandleTableHdfsAcl(TableName tableName, String 
operation) throws IOException {
+    return !tableName.isSystemTable() && checkInitialized(operation) && 
hdfsAclHelper
+        
.isTableUserScanSnapshotEnabled(masterServices.getTableDescriptors().get(tableName));
+  }
+
+  private boolean needHandleTableHdfsAcl(TableDescriptor tableDescriptor, 
String operation) {
+    TableName tableName = tableDescriptor.getTableName();
+    return !tableName.isSystemTable() && checkInitialized(operation)
+        && hdfsAclHelper.isTableUserScanSnapshotEnabled(tableDescriptor);
   }
 
   private User getActiveUser(ObserverContext<?> ctx) throws IOException {
@@ -530,6 +565,42 @@ public class SnapshotScannerHDFSAclController implements 
MasterCoprocessor, Mast
     return userProvider.getCurrent();
   }
 
+  /**
+   * Remove table user access HDFS acl from namespace directory if the user 
has no permissions of
+   * global, ns of the table or other tables of the ns, eg: Bob has 'ns1:t1' 
read permission, when
+   * delete 'ns1:t1', if Bob has global read permission, '@ns1' read 
permission or
+   * 'ns1:other_tables' read permission, then skip remove Bob access acl in 
ns1Dirs, otherwise,
+   * remove Bob access acl.
+   * @param aclTable acl table
+   * @param tableName the name of the table
+   * @param tablesUsers table users set
+   * @return users whose access acl will be removed from the namespace of the 
table
+   * @throws IOException if an error occurred
+   */
+  private Set<String> filterUsersToRemoveNsAccessAcl(Table aclTable, TableName 
tableName,
+      Set<String> tablesUsers) throws IOException {
+    Set<String> removeUsers = new HashSet<>();
+    byte[] namespace = tableName.getNamespace();
+    for (String user : tablesUsers) {
+      List<byte[]> userEntries = 
SnapshotScannerHDFSAclStorage.getUserEntries(aclTable, user);
+      boolean remove = true;
+      for (byte[] entry : userEntries) {
+        if (PermissionStorage.isGlobalEntry(entry)
+            || (PermissionStorage.isNamespaceEntry(entry)
+                && Bytes.equals(PermissionStorage.fromNamespaceEntry(entry), 
namespace))
+            || (!Bytes.equals(tableName.getName(), entry)
+                && Bytes.equals(TableName.valueOf(entry).getNamespace(), 
namespace))) {
+          remove = false;
+          break;
+        }
+      }
+      if (remove) {
+        removeUsers.add(user);
+      }
+    }
+    return removeUsers;
+  }
+
   static final class SnapshotScannerHDFSAclStorage {
     /**
      * Add a new CF in HBase acl table to record if the HBase read permission 
is synchronized to
@@ -554,6 +625,22 @@ public class SnapshotScannerHDFSAclController implements 
MasterCoprocessor, Mast
       addUserEntry(aclTable, user, 
Bytes.toBytes(PermissionStorage.toNamespaceEntry(namespace)));
     }
 
+    static void addUserTableHdfsAcl(Connection connection, Set<String> users, 
TableName tableName)
+        throws IOException {
+      try (Table aclTable = 
connection.getTable(PermissionStorage.ACL_TABLE_NAME)) {
+        for (String user : users) {
+          addUserTableHdfsAcl(aclTable, user, tableName);
+        }
+      }
+    }
+
+    static void addUserTableHdfsAcl(Connection connection, String user, 
TableName tableName)
+        throws IOException {
+      try (Table aclTable = 
connection.getTable(PermissionStorage.ACL_TABLE_NAME)) {
+        addUserTableHdfsAcl(aclTable, user, tableName);
+      }
+    }
+
     static void addUserTableHdfsAcl(Table aclTable, String user, TableName 
tableName)
         throws IOException {
       addUserEntry(aclTable, user, tableName.getName());
@@ -579,6 +666,15 @@ public class SnapshotScannerHDFSAclController implements 
MasterCoprocessor, Mast
       deleteUserEntry(aclTable, user, tableName.getName());
     }
 
+    static void deleteUserTableHdfsAcl(Connection connection, Set<String> 
users,
+        TableName tableName) throws IOException {
+      try (Table aclTable = 
connection.getTable(PermissionStorage.ACL_TABLE_NAME)) {
+        for (String user : users) {
+          deleteUserTableHdfsAcl(aclTable, user, tableName);
+        }
+      }
+    }
+
     private static void deleteUserEntry(Table aclTable, String user, byte[] 
entry)
         throws IOException {
       Delete delete = new Delete(entry);
@@ -586,8 +682,10 @@ public class SnapshotScannerHDFSAclController implements 
MasterCoprocessor, Mast
       aclTable.delete(delete);
     }
 
-    static void deleteNamespaceHdfsAcl(Table aclTable, String namespace) 
throws IOException {
-      deleteEntry(aclTable, 
Bytes.toBytes(PermissionStorage.toNamespaceEntry(namespace)));
+    static void deleteNamespaceHdfsAcl(Connection connection, String 
namespace) throws IOException {
+      try (Table aclTable = 
connection.getTable(PermissionStorage.ACL_TABLE_NAME)) {
+        deleteEntry(aclTable, 
Bytes.toBytes(PermissionStorage.toNamespaceEntry(namespace)));
+      }
     }
 
     static void deleteTableHdfsAcl(Table aclTable, TableName tableName) throws 
IOException {
@@ -600,12 +698,12 @@ public class SnapshotScannerHDFSAclController implements 
MasterCoprocessor, Mast
       aclTable.delete(delete);
     }
 
-    static List<String> getTableUsers(Table aclTable, TableName tableName) 
throws IOException {
+    static Set<String> getTableUsers(Table aclTable, TableName tableName) 
throws IOException {
       return getEntryUsers(aclTable, tableName.getName());
     }
 
-    private static List<String> getEntryUsers(Table aclTable, byte[] entry) 
throws IOException {
-      List<String> users = new ArrayList<>();
+    private static Set<String> getEntryUsers(Table aclTable, byte[] entry) 
throws IOException {
+      Set<String> users = new HashSet<>();
       Get get = new Get(entry);
       get.addFamily(HDFS_ACL_FAMILY);
       Result result = aclTable.get(get);
@@ -624,7 +722,7 @@ public class SnapshotScannerHDFSAclController implements 
MasterCoprocessor, Mast
         String userName) throws IOException {
       Set<String> namespaces = new HashSet<>();
       Set<TableName> tables = new HashSet<>();
-      List<byte[]> userEntries = 
SnapshotScannerHDFSAclStorage.getUserEntries(aclTable, userName);
+      List<byte[]> userEntries = getUserEntries(aclTable, userName);
       for (byte[] entry : userEntries) {
         if (PermissionStorage.isNamespaceEntry(entry)) {
           
namespaces.add(Bytes.toString(PermissionStorage.fromNamespaceEntry(entry)));
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/security/access/SnapshotScannerHDFSAclHelper.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/security/access/SnapshotScannerHDFSAclHelper.java
index 2a62ff0..60d9155 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/security/access/SnapshotScannerHDFSAclHelper.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/security/access/SnapshotScannerHDFSAclHelper.java
@@ -23,7 +23,6 @@ import static 
org.apache.hadoop.fs.permission.AclEntryScope.DEFAULT;
 import static org.apache.hadoop.fs.permission.AclEntryType.GROUP;
 import static org.apache.hadoop.fs.permission.AclEntryType.USER;
 import static org.apache.hadoop.fs.permission.FsAction.READ_EXECUTE;
-import static org.apache.hadoop.hbase.security.access.Permission.Action.READ;
 
 import java.io.Closeable;
 import java.io.FileNotFoundException;
@@ -31,6 +30,7 @@ import java.io.IOException;
 import java.util.ArrayList;
 import java.util.HashSet;
 import java.util.List;
+import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutionException;
@@ -47,6 +47,7 @@ import org.apache.hadoop.fs.permission.AclEntryScope;
 import org.apache.hadoop.fs.permission.FsPermission;
 import org.apache.hadoop.hbase.AuthUtil;
 import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.NamespaceDescriptor;
 import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.client.Admin;
 import org.apache.hadoop.hbase.client.Connection;
@@ -58,6 +59,7 @@ import org.apache.yetus.audience.InterfaceAudience;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import org.apache.hbase.thirdparty.com.google.common.collect.ListMultimap;
 import org.apache.hbase.thirdparty.com.google.common.collect.Lists;
 import org.apache.hbase.thirdparty.com.google.common.collect.Sets;
 import 
org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;
@@ -124,20 +126,16 @@ public class SnapshotScannerHDFSAclHelper implements 
Closeable {
       pathHelper.getTmpDir(), pathHelper.getArchiveDir());
     paths.addAll(getGlobalRootPaths());
     for (Path path : paths) {
-      if (!fs.exists(path)) {
-        fs.mkdirs(path);
-      }
+      createDirIfNotExist(path);
       fs.setPermission(path, new FsPermission(
           conf.get(COMMON_DIRECTORY_PERMISSION, 
COMMON_DIRECTORY_PERMISSION_DEFAULT)));
     }
     // create snapshot restore directory
     Path restoreDir =
         new Path(conf.get(SNAPSHOT_RESTORE_TMP_DIR, 
SNAPSHOT_RESTORE_TMP_DIR_DEFAULT));
-    if (!fs.exists(restoreDir)) {
-      fs.mkdirs(restoreDir);
-      fs.setPermission(restoreDir, new 
FsPermission(conf.get(SNAPSHOT_RESTORE_DIRECTORY_PERMISSION,
-        SNAPSHOT_RESTORE_DIRECTORY_PERMISSION_DEFAULT)));
-    }
+    createDirIfNotExist(restoreDir);
+    fs.setPermission(restoreDir, new 
FsPermission(conf.get(SNAPSHOT_RESTORE_DIRECTORY_PERMISSION,
+      SNAPSHOT_RESTORE_DIRECTORY_PERMISSION_DEFAULT)));
   }
 
   /**
@@ -194,11 +192,12 @@ public class SnapshotScannerHDFSAclHelper implements 
Closeable {
       long start = System.currentTimeMillis();
       TableName tableName = snapshot.getTableName();
       // global user permission can be inherited from default acl automatically
-      Set<String> userSet = getUsersWithTableReadAction(tableName);
-      
userSet.addAll(getUsersWithNamespaceReadAction(tableName.getNamespaceAsString()));
-      Path path = pathHelper.getSnapshotDir(snapshot.getName());
-      handleHDFSAcl(new HDFSAclOperation(fs, path, userSet, 
HDFSAclOperation.OperationType.MODIFY,
-          true, HDFSAclOperation.AclType.DEFAULT_ADN_ACCESS)).get();
+      Set<String> userSet = getUsersWithTableReadAction(tableName, true, 
false);
+      if (userSet.size() > 0) {
+        Path path = pathHelper.getSnapshotDir(snapshot.getName());
+        handleHDFSAcl(new HDFSAclOperation(fs, path, userSet, 
HDFSAclOperation.OperationType.MODIFY,
+            true, HDFSAclOperation.AclType.DEFAULT_ADN_ACCESS)).get();
+      }
       LOG.info("Set HDFS acl when snapshot {}, cost {} ms", snapshot.getName(),
         System.currentTimeMillis() - start);
       return true;
@@ -209,71 +208,70 @@ public class SnapshotScannerHDFSAclHelper implements 
Closeable {
   }
 
   /**
-   * Reset acl when truncate table
-   * @param tableName the specific table
+   * Remove table access acl from namespace dir when delete table
+   * @param tableName the table
+   * @param removeUsers the users whose access acl will be removed
    * @return false if an error occurred, otherwise true
    */
-  public boolean resetTableAcl(TableName tableName) {
+  public boolean removeNamespaceAccessAcl(TableName tableName, Set<String> 
removeUsers,
+      String operation) {
     try {
       long start = System.currentTimeMillis();
-      // global and namespace user permission can be inherited from default 
acl automatically
-      setTableAcl(tableName, getUsersWithTableReadAction(tableName));
-      LOG.info("Set HDFS acl when truncate {}, cost {} ms", tableName,
+      if (removeUsers.size() > 0) {
+        handleNamespaceAccessAcl(tableName.getNamespaceAsString(), removeUsers,
+          HDFSAclOperation.OperationType.REMOVE);
+      }
+      LOG.info("Remove HDFS acl when {} table {}, cost {} ms", operation, 
tableName,
         System.currentTimeMillis() - start);
       return true;
     } catch (Exception e) {
-      LOG.error("Set HDFS acl error when truncate {}", tableName, e);
+      LOG.error("Remove HDFS acl error when {} table {}", operation, 
tableName, e);
       return false;
     }
   }
 
   /**
-   * Remove table access acl from namespace dir when delete table
+   * Add table user acls
    * @param tableName the table
-   * @param removeUsers the users whose access acl will be removed
+   * @param users the table users with READ permission
    * @return false if an error occurred, otherwise true
    */
-  public boolean removeNamespaceAcl(TableName tableName, Set<String> 
removeUsers) {
+  public boolean addTableAcl(TableName tableName, Set<String> users, String 
operation) {
     try {
       long start = System.currentTimeMillis();
-      List<AclEntry> aclEntries = removeUsers.stream()
-          .map(removeUser -> aclEntry(ACCESS, 
removeUser)).collect(Collectors.toList());
-      String namespace = tableName.getNamespaceAsString();
-      List<Path> nsPaths = 
Lists.newArrayList(pathHelper.getTmpNsDir(namespace),
-        pathHelper.getDataNsDir(namespace), 
pathHelper.getMobDataNsDir(namespace));
-      // If table has no snapshots, then remove archive ns HDFS acl, otherwise 
reserve the archive
-      // ns acl to make the snapshots can be scanned, in the second case, need 
to remove the archive
-      // ns acl when all snapshots of the deleted table are deleted (will do 
it in later work).
-      if (getTableSnapshotPaths(tableName).isEmpty()) {
-        nsPaths.add(pathHelper.getArchiveNsDir(namespace));
+      if (users.size() > 0) {
+        HDFSAclOperation.OperationType operationType = 
HDFSAclOperation.OperationType.MODIFY;
+        handleNamespaceAccessAcl(tableName.getNamespaceAsString(), users, 
operationType);
+        handleTableAcl(Sets.newHashSet(tableName), users, new HashSet<>(0), 
new HashSet<>(0),
+          operationType);
       }
-      for (Path nsPath : nsPaths) {
-        fs.removeAclEntries(nsPath, aclEntries);
-      }
-      LOG.info("Remove HDFS acl when delete table {}, cost {} ms", tableName,
+      LOG.info("Set HDFS acl when {} table {}, cost {} ms", operation, 
tableName,
         System.currentTimeMillis() - start);
       return true;
     } catch (Exception e) {
-      LOG.error("Set HDFS acl error when delete table {}", tableName, e);
+      LOG.error("Set HDFS acl error when {} table {}", operation, tableName, 
e);
       return false;
     }
   }
 
   /**
-   * Set table owner acl when create table
+   * Remove table acls when modify table
    * @param tableName the table
-   * @param user the table owner
+   * @param users the table users with READ permission
    * @return false if an error occurred, otherwise true
    */
-  public boolean addTableAcl(TableName tableName, String user) {
+  public boolean removeTableAcl(TableName tableName, Set<String> users) {
     try {
       long start = System.currentTimeMillis();
-      setTableAcl(tableName, Sets.newHashSet(user));
-      LOG.info("Set HDFS acl when create table {}, cost {} ms", tableName,
+      if (users.size() > 0) {
+        handleTableAcl(Sets.newHashSet(tableName), users, new HashSet<>(0), 
new HashSet<>(0),
+          HDFSAclOperation.OperationType.REMOVE);
+      }
+      LOG.info("Set HDFS acl when create or modify table {}, cost {} ms", 
tableName,
         System.currentTimeMillis() - start);
       return true;
     } catch (Exception e) {
-      LOG.error("Set HDFS acl error when create table {}", tableName, e);
+      LOG.error("Set HDFS acl error when create or modify table {}", 
tableName, e);
       return false;
     }
   }
@@ -323,6 +321,7 @@ public class SnapshotScannerHDFSAclHelper implements 
Closeable {
       HDFSAclOperation.OperationType operationType)
       throws ExecutionException, InterruptedException, IOException {
     namespaces.removeAll(skipNamespaces);
+    namespaces.remove(NamespaceDescriptor.SYSTEM_NAMESPACE_NAME_STR);
     // handle namespace root directories HDFS acls
     List<HDFSAclOperation> hdfsAclOperations = new ArrayList<>();
     Set<String> skipTableNamespaces =
@@ -350,7 +349,8 @@ public class SnapshotScannerHDFSAclHelper implements 
Closeable {
     Set<TableName> tables = new HashSet<>();
     for (String namespace : namespaces) {
       
tables.addAll(admin.listTableDescriptorsByNamespace(Bytes.toBytes(namespace)).stream()
-          .map(TableDescriptor::getTableName).collect(Collectors.toSet()));
+          
.filter(this::isTableUserScanSnapshotEnabled).map(TableDescriptor::getTableName)
+          .collect(Collectors.toSet()));
     }
     handleTableAcl(tables, users, skipNamespaces, skipTables, operationType);
   }
@@ -392,12 +392,11 @@ public class SnapshotScannerHDFSAclHelper implements 
Closeable {
     future.get();
   }
 
-  private void setTableAcl(TableName tableName, Set<String> users)
-      throws ExecutionException, InterruptedException, IOException {
-    HDFSAclOperation.OperationType operationType = 
HDFSAclOperation.OperationType.MODIFY;
-    handleNamespaceAccessAcl(tableName.getNamespaceAsString(), users, 
operationType);
-    handleTableAcl(Sets.newHashSet(tableName), users, new HashSet<>(0), new 
HashSet<>(0),
-      operationType);
+  void createTableDirectories(TableName tableName) throws IOException {
+    List<Path> paths = getTableRootPaths(tableName, false);
+    for (Path path : paths) {
+      createDirIfNotExist(path);
+    }
   }
 
   /**
@@ -413,13 +412,10 @@ public class SnapshotScannerHDFSAclHelper implements 
Closeable {
    * return paths that user will namespace permission will visit
    * @param namespace the namespace
    * @return the path list
-   * @throws IOException if an error occurred
    */
   List<Path> getNamespaceRootPaths(String namespace) {
-    List<Path> paths =
-        Lists.newArrayList(pathHelper.getTmpNsDir(namespace), 
pathHelper.getDataNsDir(namespace),
-          pathHelper.getMobDataNsDir(namespace), 
pathHelper.getArchiveNsDir(namespace));
-    return paths;
+    return Lists.newArrayList(pathHelper.getTmpNsDir(namespace), 
pathHelper.getDataNsDir(namespace),
+      pathHelper.getMobDataNsDir(namespace), 
pathHelper.getArchiveNsDir(namespace));
   }
 
   /**
@@ -448,27 +444,76 @@ public class SnapshotScannerHDFSAclHelper implements 
Closeable {
   }
 
   /**
+   * Return users with global read permission
+   * @return users with global read permission
+   * @throws IOException if an error occurred
+   */
+  private Set<String> getUsersWithGlobalReadAction() throws IOException {
+    return 
getUsersWithReadAction(PermissionStorage.getGlobalPermissions(conf));
+  }
+
+  /**
    * Return users with namespace read permission
    * @param namespace the namespace
+   * @param includeGlobal true if include users with global read action
    * @return users with namespace read permission
    * @throws IOException if an error occurred
    */
-  private Set<String> getUsersWithNamespaceReadAction(String namespace) throws 
IOException {
-    return PermissionStorage.getNamespacePermissions(conf, 
namespace).entries().stream()
-        .filter(entry -> entry.getValue().getPermission().implies(READ))
-        .map(entry -> entry.getKey()).collect(Collectors.toSet());
+  Set<String> getUsersWithNamespaceReadAction(String namespace, boolean 
includeGlobal)
+      throws IOException {
+    Set<String> users =
+        getUsersWithReadAction(PermissionStorage.getNamespacePermissions(conf, 
namespace));
+    if (includeGlobal) {
+      users.addAll(getUsersWithGlobalReadAction());
+    }
+    return users;
   }
 
   /**
    * Return users with table read permission
    * @param tableName the table
+   * @param includeNamespace true if include users with namespace read action
+   * @param includeGlobal true if include users with global read action
    * @return users with table read permission
    * @throws IOException if an error occurred
    */
-  private Set<String> getUsersWithTableReadAction(TableName tableName) throws 
IOException {
-    return PermissionStorage.getTablePermissions(conf, 
tableName).entries().stream()
-        .filter(entry -> entry.getValue().getPermission().implies(READ))
-        .map(entry -> entry.getKey()).collect(Collectors.toSet());
+  Set<String> getUsersWithTableReadAction(TableName tableName, boolean 
includeNamespace,
+      boolean includeGlobal) throws IOException {
+    Set<String> users =
+        getUsersWithReadAction(PermissionStorage.getTablePermissions(conf, 
tableName));
+    if (includeNamespace) {
+      users
+          
.addAll(getUsersWithNamespaceReadAction(tableName.getNamespaceAsString(), 
includeGlobal));
+    }
+    return users;
+  }
+
+  private Set<String>
+      getUsersWithReadAction(ListMultimap<String, UserPermission> 
permissionMultimap) {
+    return permissionMultimap.entries().stream()
+        .filter(entry -> 
checkUserPermission(entry.getValue())).map(Map.Entry::getKey)
+        .collect(Collectors.toSet());
+  }
+
+  private boolean checkUserPermission(UserPermission userPermission) {
+    boolean result = containReadAction(userPermission);
+    if (result && userPermission.getPermission() instanceof TablePermission) {
+      result = isNotFamilyOrQualifierPermission((TablePermission) 
userPermission.getPermission());
+    }
+    return result;
+  }
+
+  boolean containReadAction(UserPermission userPermission) {
+    return userPermission.getPermission().implies(Permission.Action.READ);
+  }
+
+  boolean isNotFamilyOrQualifierPermission(TablePermission tablePermission) {
+    return !tablePermission.hasFamily() && !tablePermission.hasQualifier();
+  }
+
+  boolean isTableUserScanSnapshotEnabled(TableDescriptor tableDescriptor) {
+    return tableDescriptor == null ? false
+        : Boolean.valueOf(tableDescriptor.getValue(USER_SCAN_SNAPSHOT_ENABLE));
   }
 
   PathHelper getPathHelper() {
@@ -515,6 +560,18 @@ public class SnapshotScannerHDFSAclHelper implements 
Closeable {
         .setPermission(READ_EXECUTE).build();
   }
 
+  void createDirIfNotExist(Path path) throws IOException {
+    if (!fs.exists(path)) {
+      fs.mkdirs(path);
+    }
+  }
+
+  void deleteEmptyDir(Path path) throws IOException {
+    if (fs.exists(path) && fs.listStatus(path).length == 0) {
+      fs.delete(path, false);
+    }
+  }
+
   /**
    * Inner class used to describe modify or remove what type of acl 
entries(ACCESS, DEFAULT,
    * ACCESS_AND_DEFAULT) for files or directories(and child files).
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/security/access/TestSnapshotScannerHDFSAclController.java
 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/security/access/TestSnapshotScannerHDFSAclController.java
index 7e5bdca..69a4834 100644
--- 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/security/access/TestSnapshotScannerHDFSAclController.java
+++ 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/security/access/TestSnapshotScannerHDFSAclController.java
@@ -33,6 +33,8 @@ import java.util.List;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.permission.AclEntry;
+import org.apache.hadoop.fs.permission.AclEntryScope;
 import org.apache.hadoop.fs.permission.FsPermission;
 import org.apache.hadoop.hbase.HBaseClassTestRule;
 import org.apache.hadoop.hbase.HBaseTestingUtility;
@@ -55,6 +57,7 @@ import 
org.apache.hadoop.hbase.testclassification.SecurityTests;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.util.HFileArchiveUtil;
 import org.junit.AfterClass;
+import org.junit.Assert;
 import org.junit.BeforeClass;
 import org.junit.ClassRule;
 import org.junit.Rule;
@@ -81,6 +84,7 @@ public class TestSnapshotScannerHDFSAclController {
   private static FileSystem fs = null;
   private static Path rootDir = null;
   private static User unGrantUser = null;
+  private static SnapshotScannerHDFSAclHelper helper;
 
   @BeforeClass
   public static void setupBeforeClass() throws Exception {
@@ -107,6 +111,7 @@ public class TestSnapshotScannerHDFSAclController {
     rootDir = TEST_UTIL.getDefaultRootDirPath();
     fs = rootDir.getFileSystem(conf);
     unGrantUser = User.createUserForTesting(conf, UN_GRANT_USER, new String[] 
{});
+    helper = new SnapshotScannerHDFSAclHelper(conf, admin.getConnection());
 
     // set hbase directory permission
     FsPermission commonDirectoryPermission =
@@ -132,7 +137,9 @@ public class TestSnapshotScannerHDFSAclController {
       path = path.getParent();
     }
 
-    TEST_UTIL.waitTableAvailable(PermissionStorage.ACL_TABLE_NAME);
+    SnapshotScannerHDFSAclController coprocessor = 
TEST_UTIL.getHBaseCluster().getMaster()
+        
.getMasterCoprocessorHost().findCoprocessor(SnapshotScannerHDFSAclController.class);
+    TEST_UTIL.waitFor(1200000, () -> coprocessor.checkInitialized("check 
initialized"));
   }
 
   @AfterClass
@@ -472,25 +479,25 @@ public class TestSnapshotScannerHDFSAclController {
       // delete
       admin.disableTable(table);
       admin.deleteTable(table);
-      TestHDFSAclHelper.canUserScanSnapshot(TEST_UTIL, grantUser, snapshot, 6);
+      TestHDFSAclHelper.canUserScanSnapshot(TEST_UTIL, grantUser, snapshot, 
-1);
 
       // restore snapshot and restore acl
       admin.restoreSnapshot(snapshot, true, true);
       TestHDFSAclHelper.put2(t);
       // snapshot
       admin.snapshot(snapshot2, table);
-      // delete
-      admin.disableTable(table);
-      admin.deleteTable(table);
       TestHDFSAclHelper.canUserScanSnapshot(TEST_UTIL, grantUser, snapshot, 6);
       TestHDFSAclHelper.canUserScanSnapshot(TEST_UTIL, grantUser, snapshot2, 
10);
 
+      // delete
+      admin.disableTable(table);
+      admin.deleteTable(table);
       // restore snapshot and skip restore acl
       admin.restoreSnapshot(snapshot);
       admin.snapshot(snapshot3, table);
 
-      TestHDFSAclHelper.canUserScanSnapshot(TEST_UTIL, grantUser, snapshot, 6);
-      TestHDFSAclHelper.canUserScanSnapshot(TEST_UTIL, grantUser, snapshot2, 
10);
+      TestHDFSAclHelper.canUserScanSnapshot(TEST_UTIL, grantUser, snapshot, 
-1);
+      TestHDFSAclHelper.canUserScanSnapshot(TEST_UTIL, grantUser, snapshot2, 
-1);
       TestHDFSAclHelper.canUserScanSnapshot(TEST_UTIL, grantUser, snapshot3, 
-1);
     }
   }
@@ -524,7 +531,7 @@ public class TestSnapshotScannerHDFSAclController {
       admin.disableTable(tableName1);
       admin.deleteTable(tableName1);
       // grantUser2 and grantUser3 should have data/ns acl
-      TestHDFSAclHelper.canUserScanSnapshot(TEST_UTIL, grantUser1, snapshot1, 
6);
+      TestHDFSAclHelper.canUserScanSnapshot(TEST_UTIL, grantUser1, snapshot1, 
-1);
       TestHDFSAclHelper.canUserScanSnapshot(TEST_UTIL, grantUser2, snapshot2, 
6);
       TestHDFSAclHelper.canUserScanSnapshot(TEST_UTIL, grantUser3, snapshot2, 
6);
     }
@@ -572,11 +579,6 @@ public class TestSnapshotScannerHDFSAclController {
     Path archiveTableDir = HFileArchiveUtil.getTableArchivePath(rootDir, 
table);
     assertTrue(fs.exists(archiveTableDir));
 
-    // delete table and grant user can scan snapshot
-    admin.disableTable(table);
-    admin.deleteTable(table);
-    TestHDFSAclHelper.canUserScanSnapshot(TEST_UTIL, grantUser, snapshot, 6);
-
     // Check SnapshotScannerHDFSAclCleaner method
     
assertTrue(SnapshotScannerHDFSAclCleaner.isArchiveTableDir(archiveTableDir));
     
assertTrue(SnapshotScannerHDFSAclCleaner.isArchiveNamespaceDir(archiveTableDir.getParent()));
@@ -602,11 +604,112 @@ public class TestSnapshotScannerHDFSAclController {
       TestHDFSAclHelper.canUserScanSnapshot(TEST_UTIL, grantUser, snapshot, 6);
     }
   }
+
+  @Test
+  public void testModifyTable() throws Exception {
+    String namespace = name.getMethodName();
+    TableName table = TableName.valueOf(namespace, "t1");
+    String snapshot = namespace + "t1";
+    TableName table2 = TableName.valueOf(namespace, "t2");
+
+    String tableUserName = name.getMethodName();
+    User tableUser = User.createUserForTesting(conf, tableUserName, new 
String[] {});
+    String tableUserName2 = tableUserName + "2";
+    User tableUser2 = User.createUserForTesting(conf, tableUserName2, new 
String[] {});
+    String tableUserName3 = tableUserName + "3";
+    User tableUser3 = User.createUserForTesting(conf, tableUserName3, new 
String[] {});
+    String nsUserName = tableUserName + "-ns";
+    User nsUser = User.createUserForTesting(conf, nsUserName, new String[] {});
+    String globalUserName = tableUserName + "-global";
+    User globalUser = User.createUserForTesting(conf, globalUserName, new 
String[] {});
+    String globalUserName2 = tableUserName + "-global-2";
+    User globalUser2 = User.createUserForTesting(conf, globalUserName2, new 
String[] {});
+
+    SecureTestUtil.grantGlobal(TEST_UTIL, globalUserName, READ);
+    TestHDFSAclHelper.createNamespace(TEST_UTIL, namespace);
+    SecureTestUtil.grantOnNamespace(TEST_UTIL, nsUserName, namespace, READ);
+    TableDescriptor td = 
TestHDFSAclHelper.createUserScanSnapshotDisabledTable(TEST_UTIL, table);
+    admin.snapshot(snapshot, table);
+    SecureTestUtil.grantGlobal(TEST_UTIL, globalUserName2, READ);
+    TestHDFSAclHelper.grantOnTable(TEST_UTIL, tableUserName, table, READ);
+    SecureTestUtil.grantOnTable(TEST_UTIL, tableUserName2, table, 
TestHDFSAclHelper.COLUMN1, null,
+      READ);
+    TestHDFSAclHelper.grantOnTable(TEST_UTIL, tableUserName3, table, WRITE);
+
+    TestHDFSAclHelper.canUserScanSnapshot(TEST_UTIL, tableUser, snapshot, -1);
+    TestHDFSAclHelper.canUserScanSnapshot(TEST_UTIL, tableUser2, snapshot, -1);
+    TestHDFSAclHelper.canUserScanSnapshot(TEST_UTIL, tableUser3, snapshot, -1);
+    TestHDFSAclHelper.canUserScanSnapshot(TEST_UTIL, nsUser, snapshot, -1);
+    // Global permission is set before table is created, the acl is inherited
+    TestHDFSAclHelper.canUserScanSnapshot(TEST_UTIL, globalUser, snapshot, 6);
+    // Global permission is set after table is created, the table dir acl is 
skip
+    TestHDFSAclHelper.canUserScanSnapshot(TEST_UTIL, globalUser2, snapshot, 
-1);
+
+    // enable user scan snapshot
+    admin.modifyTable(TableDescriptorBuilder.newBuilder(td)
+        .setValue(SnapshotScannerHDFSAclHelper.USER_SCAN_SNAPSHOT_ENABLE, 
"true").build());
+    TestHDFSAclHelper.canUserScanSnapshot(TEST_UTIL, tableUser, snapshot, 6);
+    TestHDFSAclHelper.canUserScanSnapshot(TEST_UTIL, tableUser2, snapshot, -1);
+    TestHDFSAclHelper.canUserScanSnapshot(TEST_UTIL, tableUser3, snapshot, -1);
+    TestHDFSAclHelper.canUserScanSnapshot(TEST_UTIL, nsUser, snapshot, 6);
+    TestHDFSAclHelper.canUserScanSnapshot(TEST_UTIL, globalUser, snapshot, 6);
+    // disable user scan snapshot
+    SecureTestUtil.grantOnNamespace(TEST_UTIL, tableUserName2, namespace, 
READ);
+    TestHDFSAclHelper.createTable(TEST_UTIL, table2);
+    TestHDFSAclHelper.grantOnTable(TEST_UTIL, tableUserName3, table2, READ);
+    admin.modifyTable(TableDescriptorBuilder.newBuilder(td)
+        .setValue(SnapshotScannerHDFSAclHelper.USER_SCAN_SNAPSHOT_ENABLE, 
"false").build());
+    TestHDFSAclHelper.canUserScanSnapshot(TEST_UTIL, tableUser, snapshot, -1);
+    TestHDFSAclHelper.canUserScanSnapshot(TEST_UTIL, tableUser2, snapshot, -1);
+    TestHDFSAclHelper.canUserScanSnapshot(TEST_UTIL, tableUser3, snapshot, -1);
+    TestHDFSAclHelper.canUserScanSnapshot(TEST_UTIL, nsUser, snapshot, -1);
+    TestHDFSAclHelper.canUserScanSnapshot(TEST_UTIL, globalUser, snapshot, -1);
+    TestHDFSAclHelper.canUserScanSnapshot(TEST_UTIL, globalUser2, snapshot, 
-1);
+    List<Path> namespaceRootPaths = helper.getNamespaceRootPaths(namespace);
+    List<Path> tableRootPaths = helper.getTableRootPaths(table, false);
+    // check access
+    for (Path path : tableRootPaths) {
+      checkUserAclEntry(path, tableUserName, false, false);
+      checkUserAclEntry(path, tableUserName2, false, false);
+      checkUserAclEntry(path, tableUserName3, false, false);
+      checkUserAclEntry(path, nsUserName, false, false);
+      checkUserAclEntry(path, globalUserName, false, false);
+      checkUserAclEntry(path, globalUserName2, false, false);
+    }
+    for (Path path : namespaceRootPaths) {
+      checkUserAclEntry(path, tableUserName, false, false);
+      checkUserAclEntry(path, tableUserName2, true, true);
+      checkUserAclEntry(path, tableUserName3, true, false);
+      checkUserAclEntry(path, nsUserName, true, true);
+      checkUserAclEntry(path, globalUserName, true, true);
+      checkUserAclEntry(path, globalUserName2, true, true);
+    }
+  }
+
+  private void checkUserAclEntry(Path path, String userName, boolean 
requireAccessAcl,
+      boolean requireDefaultAcl) throws IOException {
+    boolean accessAclEntry = false;
+    boolean defaultAclEntry = false;
+    if (fs.exists(path)) {
+      for (AclEntry aclEntry : fs.getAclStatus(path).getEntries()) {
+        String user = aclEntry.getName();
+        if (user != null && user.equals(userName)) {
+          if (aclEntry.getScope() == AclEntryScope.DEFAULT) {
+            defaultAclEntry = true;
+          } else if (aclEntry.getScope() == AclEntryScope.ACCESS) {
+            accessAclEntry = true;
+          }
+        }
+      }
+    }
+    String message = "require user: " + userName + ", path: " + 
path.toString() + " acl";
+    Assert.assertEquals(message, requireAccessAcl, accessAclEntry);
+    Assert.assertEquals(message, requireDefaultAcl, defaultAclEntry);
+  }
 }
 
 final class TestHDFSAclHelper {
-  private static final Logger LOG =
-      LoggerFactory.getLogger(TestHDFSAclHelper.class);
+  private static final Logger LOG = 
LoggerFactory.getLogger(TestHDFSAclHelper.class);
 
   private TestHDFSAclHelper() {
   }
@@ -616,10 +719,9 @@ final class TestHDFSAclHelper {
     SecureTestUtil.grantOnTable(util, user, tableName, null, null, actions);
   }
 
-  private static void createNamespace(HBaseTestingUtility util, String 
namespace)
-      throws IOException {
-    if (!Arrays.stream(util.getAdmin().listNamespaceDescriptors())
-        .anyMatch(ns -> ns.getName().equals(namespace))) {
+  static void createNamespace(HBaseTestingUtility util, String namespace) 
throws IOException {
+    if (Arrays.stream(util.getAdmin().listNamespaceDescriptors())
+        .noneMatch(ns -> ns.getName().equals(namespace))) {
       NamespaceDescriptor namespaceDescriptor = 
NamespaceDescriptor.create(namespace).build();
       util.getAdmin().createNamespace(namespaceDescriptor);
     }
@@ -627,11 +729,8 @@ final class TestHDFSAclHelper {
 
   static Table createTable(HBaseTestingUtility util, TableName tableName) 
throws IOException {
     createNamespace(util, tableName.getNamespaceAsString());
-    TableDescriptor td = TableDescriptorBuilder.newBuilder(tableName)
-        
.setColumnFamily(ColumnFamilyDescriptorBuilder.newBuilder(COLUMN1).build())
-        
.setColumnFamily(ColumnFamilyDescriptorBuilder.newBuilder(COLUMN2).build())
-        .setOwner(User.createUserForTesting(util.getConfiguration(), "owner", 
new String[] {}))
-        .build();
+    TableDescriptor td = getTableDescriptorBuilder(util, tableName)
+        .setValue(SnapshotScannerHDFSAclHelper.USER_SCAN_SNAPSHOT_ENABLE, 
"true").build();
     byte[][] splits = new byte[][] { Bytes.toBytes("2"), Bytes.toBytes("4") };
     return util.createTable(td, splits);
   }
@@ -644,11 +743,30 @@ final class TestHDFSAclHelper {
         
.setColumnFamily(ColumnFamilyDescriptorBuilder.newBuilder(COLUMN2).setMobEnabled(true)
             .setMobThreshold(0).build())
         .setOwner(User.createUserForTesting(util.getConfiguration(), "owner", 
new String[] {}))
-        .build();
+        .setValue(SnapshotScannerHDFSAclHelper.USER_SCAN_SNAPSHOT_ENABLE, 
"true").build();
     byte[][] splits = new byte[][] { Bytes.toBytes("2"), Bytes.toBytes("4") };
     return util.createTable(td, splits);
   }
 
+  static TableDescriptor 
createUserScanSnapshotDisabledTable(HBaseTestingUtility util,
+      TableName tableName) throws IOException {
+    createNamespace(util, tableName.getNamespaceAsString());
+    TableDescriptor td = getTableDescriptorBuilder(util, tableName).build();
+    byte[][] splits = new byte[][] { Bytes.toBytes("2"), Bytes.toBytes("4") };
+    try (Table t = util.createTable(td, splits)) {
+      put(t);
+    }
+    return td;
+  }
+
+  private static TableDescriptorBuilder 
getTableDescriptorBuilder(HBaseTestingUtility util,
+      TableName tableName) {
+    return TableDescriptorBuilder.newBuilder(tableName)
+        
.setColumnFamily(ColumnFamilyDescriptorBuilder.newBuilder(COLUMN1).build())
+        
.setColumnFamily(ColumnFamilyDescriptorBuilder.newBuilder(COLUMN2).build())
+        .setOwner(User.createUserForTesting(util.getConfiguration(), "owner", 
new String[] {}));
+  }
+
   static void createTableAndPut(HBaseTestingUtility util, TableName tableNam) 
throws IOException {
     try (Table t = createTable(util, tableNam)) {
       put(t);
@@ -701,7 +819,7 @@ final class TestHDFSAclHelper {
 
   private static PrivilegedExceptionAction<Void> 
getScanSnapshotAction(Configuration conf,
       String snapshotName, long expectedRowCount) {
-    PrivilegedExceptionAction<Void> action = () -> {
+    return () -> {
       try {
         Path restoreDir = new 
Path(SnapshotScannerHDFSAclHelper.SNAPSHOT_RESTORE_TMP_DIR_DEFAULT);
         Scan scan = new Scan();
@@ -723,6 +841,5 @@ final class TestHDFSAclHelper {
       }
       return null;
     };
-    return action;
   }
 }
\ No newline at end of file

Reply via email to