http://git-wip-us.apache.org/repos/asf/hbase/blob/6f1dd258/hbase-server/src/main/java/org/apache/hadoop/hbase/security/access/AccessController.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/security/access/AccessController.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/security/access/AccessController.java
index b06b2bf..97b3456 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/security/access/AccessController.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/security/access/AccessController.java
@@ -177,10 +177,10 @@ public class AccessController extends 
BaseMasterAndRegionObserver
   private static final String TAG_CHECK_PASSED = "tag_check_passed";
   private static final byte[] TRUE = Bytes.toBytes(true);
 
-  TableAuthManager authManager = null;
+  private AccessChecker accessChecker;
 
   /** flags if we are running on a region of the _acl_ table */
-  boolean aclRegion = false;
+  private boolean aclRegion = false;
 
   /** defined only for Endpoint implementation, so it can have way to
    access region services */
@@ -195,19 +195,19 @@ public class AccessController extends 
BaseMasterAndRegionObserver
   /** Provider for mapping principal names to Users */
   private UserProvider userProvider;
 
-  /** if we are active, usually true, only not true if 
"hbase.security.authorization"
-   has been set to false in site configuration */
-  boolean authorizationEnabled;
+  /** if we are active, usually false, only true if 
"hbase.security.authorization"
+   has been set to true in site configuration */
+  private boolean authorizationEnabled;
 
   /** if we are able to support cell ACLs */
-  boolean cellFeaturesEnabled;
+  private boolean cellFeaturesEnabled;
 
   /** if we should check EXEC permissions */
-  boolean shouldCheckExecPermission;
+  private boolean shouldCheckExecPermission;
 
   /** if we should terminate access checks early as soon as table or CF grants
     allow access; pre-0.98 compatible behavior */
-  boolean compatibleEarlyTermination;
+  private boolean compatibleEarlyTermination;
 
   /** if we have been successfully initialized */
   private volatile boolean initialized = false;
@@ -215,12 +215,8 @@ public class AccessController extends 
BaseMasterAndRegionObserver
   /** if the ACL table is available, only relevant in the master */
   private volatile boolean aclTabAvailable = false;
 
-  public static boolean isAuthorizationSupported(Configuration conf) {
-    return conf.getBoolean(User.HBASE_SECURITY_AUTHORIZATION_CONF_KEY, true);
-  }
-
   public static boolean isCellAuthorizationSupported(Configuration conf) {
-    return isAuthorizationSupported(conf) &&
+    return AccessChecker.isAuthorizationSupported(conf) &&
         (HFile.getFormatVersion(conf) >= HFile.MIN_FORMAT_VERSION_WITH_TAGS);
   }
 
@@ -229,10 +225,10 @@ public class AccessController extends 
BaseMasterAndRegionObserver
   }
 
   public TableAuthManager getAuthManager() {
-    return authManager;
+    return accessChecker.getAuthManager();
   }
 
-  void initialize(RegionCoprocessorEnvironment e) throws IOException {
+  private void initialize(RegionCoprocessorEnvironment e) throws IOException {
     final Region region = e.getRegion();
     Configuration conf = e.getConfiguration();
     Map<byte[], ListMultimap<String,TablePermission>> tables =
@@ -244,7 +240,7 @@ public class AccessController extends 
BaseMasterAndRegionObserver
       byte[] entry = t.getKey();
       ListMultimap<String,TablePermission> perms = t.getValue();
       byte[] serialized = AccessControlLists.writePermissionsAsBytes(perms, 
conf);
-      this.authManager.getZKPermissionWatcher().writeToZookeeper(entry, 
serialized);
+      getAuthManager().getZKPermissionWatcher().writeToZookeeper(entry, 
serialized);
     }
     initialized = true;
   }
@@ -254,7 +250,7 @@ public class AccessController extends 
BaseMasterAndRegionObserver
    * znodes.  This is called to synchronize ACL changes following {@code _acl_}
    * table updates.
    */
-  void updateACL(RegionCoprocessorEnvironment e,
+  private void updateACL(RegionCoprocessorEnvironment e,
       final Map<byte[], List<Cell>> familyMap) {
     Set<byte[]> entries =
         new TreeSet<byte[]>(Bytes.BYTES_RAWCOMPARATOR);
@@ -268,7 +264,7 @@ public class AccessController extends 
BaseMasterAndRegionObserver
         }
       }
     }
-    ZKPermissionWatcher zkw = this.authManager.getZKPermissionWatcher();
+    ZKPermissionWatcher zkw = getAuthManager().getZKPermissionWatcher();
     Configuration conf = regionEnv.getConfiguration();
     for (byte[] entry: entries) {
       try {
@@ -320,7 +316,7 @@ public class AccessController extends 
BaseMasterAndRegionObserver
     }
 
     // 2. check for the table-level, if successful we can short-circuit
-    if (authManager.authorize(user, tableName, (byte[])null, permRequest)) {
+    if (getAuthManager().authorize(user, tableName, (byte[])null, 
permRequest)) {
       return AuthResult.allow(request, "Table permission granted", user,
         permRequest, tableName, families);
     }
@@ -330,7 +326,7 @@ public class AccessController extends 
BaseMasterAndRegionObserver
       // all families must pass
       for (Map.Entry<byte [], ? extends Collection<?>> family : 
families.entrySet()) {
         // a) check for family level access
-        if (authManager.authorize(user, tableName, family.getKey(),
+        if (getAuthManager().authorize(user, tableName, family.getKey(),
             permRequest)) {
           continue;  // family-level permission overrides per-qualifier
         }
@@ -341,7 +337,7 @@ public class AccessController extends 
BaseMasterAndRegionObserver
             // for each qualifier of the family
             Set<byte[]> familySet = (Set<byte[]>)family.getValue();
             for (byte[] qualifier : familySet) {
-              if (!authManager.authorize(user, tableName, family.getKey(),
+              if (!getAuthManager().authorize(user, tableName, family.getKey(),
                                          qualifier, permRequest)) {
                 return AuthResult.deny(request, "Failed qualifier check", user,
                     permRequest, tableName, makeFamilyMap(family.getKey(), 
qualifier));
@@ -350,7 +346,7 @@ public class AccessController extends 
BaseMasterAndRegionObserver
           } else if (family.getValue() instanceof List) { // List<KeyValue>
             List<KeyValue> kvList = (List<KeyValue>)family.getValue();
             for (KeyValue kv : kvList) {
-              if (!authManager.authorize(user, tableName, family.getKey(),
+              if (!getAuthManager().authorize(user, tableName, family.getKey(),
                       kv.getQualifier(), permRequest)) {
                 return AuthResult.deny(request, "Failed qualifier check", user,
                     permRequest, tableName, makeFamilyMap(family.getKey(), 
kv.getQualifier()));
@@ -397,18 +393,6 @@ public class AccessController extends 
BaseMasterAndRegionObserver
     return result;
   }
 
-  private void logResult(AuthResult result) {
-    if (AUDITLOG.isTraceEnabled()) {
-      InetAddress remoteAddr = RpcServer.getRemoteAddress();
-      AUDITLOG.trace("Access " + (result.isAllowed() ? "allowed" : "denied") +
-          " for user " + (result.getUser() != null ? 
result.getUser().getShortName() : "UNKNOWN") +
-          "; reason: " + result.getReason() +
-          "; remote address: " + (remoteAddr != null ? remoteAddr : "") +
-          "; request: " + result.getRequest() +
-          "; context: " + result.toContextString());
-    }
-  }
-
   /**
    * Returns the active user to which authorization checks should be applied.
    * If we are in the context of an RPC call, the remote user is used,
@@ -424,224 +408,6 @@ public class AccessController extends 
BaseMasterAndRegionObserver
   }
 
   /**
-   * Authorizes that the current user has any of the given permissions for the
-   * given table, column family and column qualifier.
-   * @param tableName Table requested
-   * @param family Column family requested
-   * @param qualifier Column qualifier requested
-   * @throws IOException if obtaining the current user fails
-   * @throws AccessDeniedException if user has no authorization
-   */
-  private void requirePermission(String request, TableName tableName, byte[] 
family,
-      byte[] qualifier, Action... permissions) throws IOException {
-    User user = getActiveUser();
-    AuthResult result = null;
-
-    for (Action permission : permissions) {
-      if (authManager.authorize(user, tableName, family, qualifier, 
permission)) {
-        result = AuthResult.allow(request, "Table permission granted", user,
-                                  permission, tableName, family, qualifier);
-        break;
-      } else {
-        // rest of the world
-        result = AuthResult.deny(request, "Insufficient permissions", user,
-                                 permission, tableName, family, qualifier);
-      }
-    }
-    logResult(result);
-    if (authorizationEnabled && !result.isAllowed()) {
-      throw new AccessDeniedException("Insufficient permissions " + 
result.toContextString());
-    }
-  }
-
-  /**
-   * Authorizes that the current user has any of the given permissions for the
-   * given table, column family and column qualifier.
-   * @param tableName Table requested
-   * @param family Column family param
-   * @param qualifier Column qualifier param
-   * @throws IOException if obtaining the current user fails
-   * @throws AccessDeniedException if user has no authorization
-   */
-  private void requireTablePermission(String request, TableName tableName, 
byte[] family,
-      byte[] qualifier, Action... permissions) throws IOException {
-    User user = getActiveUser();
-    AuthResult result = null;
-
-    for (Action permission : permissions) {
-      if (authManager.authorize(user, tableName, null, null, permission)) {
-        result = AuthResult.allow(request, "Table permission granted", user,
-            permission, tableName, null, null);
-        result.getParams().setFamily(family).setQualifier(qualifier);
-        break;
-      } else {
-        // rest of the world
-        result = AuthResult.deny(request, "Insufficient permissions", user,
-            permission, tableName, family, qualifier);
-        result.getParams().setFamily(family).setQualifier(qualifier);
-      }
-    }
-    logResult(result);
-    if (authorizationEnabled && !result.isAllowed()) {
-      throw new AccessDeniedException("Insufficient permissions " + 
result.toContextString());
-    }
-  }
-
-  /**
-   * Authorizes that the current user has any of the given permissions to 
access the table.
-   *
-   * @param tableName Table requested
-   * @param permissions Actions being requested
-   * @throws IOException if obtaining the current user fails
-   * @throws AccessDeniedException if user has no authorization
-   */
-  private void requireAccess(String request, TableName tableName,
-      Action... permissions) throws IOException {
-    User user = getActiveUser();
-    AuthResult result = null;
-
-    for (Action permission : permissions) {
-      if (authManager.hasAccess(user, tableName, permission)) {
-        result = AuthResult.allow(request, "Table permission granted", user,
-                                  permission, tableName, null, null);
-        break;
-      } else {
-        // rest of the world
-        result = AuthResult.deny(request, "Insufficient permissions", user,
-                                 permission, tableName, null, null);
-      }
-    }
-    logResult(result);
-    if (authorizationEnabled && !result.isAllowed()) {
-      throw new AccessDeniedException("Insufficient permissions " + 
result.toContextString());
-    }
-  }
-
-  /**
-   * Authorizes that the current user has global privileges for the given 
action.
-   * @param perm The action being requested
-   * @throws IOException if obtaining the current user fails
-   * @throws AccessDeniedException if authorization is denied
-   */
-  private void requirePermission(String request, Action perm) throws 
IOException {
-    requireGlobalPermission(request, perm, null, null);
-  }
-
-  /**
-   * Checks that the user has the given global permission. The generated
-   * audit log message will contain context information for the operation
-   * being authorized, based on the given parameters.
-   * @param perm Action being requested
-   * @param tableName Affected table name.
-   * @param familyMap Affected column families.
-   */
-  private void requireGlobalPermission(String request, Action perm, TableName 
tableName,
-      Map<byte[], ? extends Collection<byte[]>> familyMap) throws IOException {
-    User user = getActiveUser();
-    AuthResult result = null;
-    if (authManager.authorize(user, perm)) {
-      result = AuthResult.allow(request, "Global check allowed", user, perm, 
tableName, familyMap);
-      result.getParams().setTableName(tableName).setFamilies(familyMap);
-      logResult(result);
-    } else {
-      result = AuthResult.deny(request, "Global check failed", user, perm, 
tableName, familyMap);
-      result.getParams().setTableName(tableName).setFamilies(familyMap);
-      logResult(result);
-      if (authorizationEnabled) {
-        throw new AccessDeniedException("Insufficient permissions for user '" +
-          (user != null ? user.getShortName() : "null") +"' (global, action=" +
-          perm.toString() + ")");
-      }
-    }
-  }
-
-  /**
-   * Checks that the user has the given global permission. The generated
-   * audit log message will contain context information for the operation
-   * being authorized, based on the given parameters.
-   * @param perm Action being requested
-   * @param namespace
-   */
-  private void requireGlobalPermission(String request, Action perm,
-                                       String namespace) throws IOException {
-    User user = getActiveUser();
-    AuthResult authResult = null;
-    if (authManager.authorize(user, perm)) {
-      authResult = AuthResult.allow(request, "Global check allowed", user, 
perm, null);
-      authResult.getParams().setNamespace(namespace);
-      logResult(authResult);
-    } else {
-      authResult = AuthResult.deny(request, "Global check failed", user, perm, 
null);
-      authResult.getParams().setNamespace(namespace);
-      logResult(authResult);
-      if (authorizationEnabled) {
-        throw new AccessDeniedException("Insufficient permissions for user '" +
-          (user != null ? user.getShortName() : "null") +"' (global, action=" +
-          perm.toString() + ")");
-      }
-    }
-  }
-
-  /**
-   * Checks that the user has the given global or namespace permission.
-   * @param namespace
-   * @param permissions Actions being requested
-   */
-  public void requireNamespacePermission(String request, String namespace,
-      Action... permissions) throws IOException {
-    User user = getActiveUser();
-    AuthResult result = null;
-
-    for (Action permission : permissions) {
-      if (authManager.authorize(user, namespace, permission)) {
-        result = AuthResult.allow(request, "Namespace permission granted",
-            user, permission, namespace);
-        break;
-      } else {
-        // rest of the world
-        result = AuthResult.deny(request, "Insufficient permissions", user,
-            permission, namespace);
-      }
-    }
-    logResult(result);
-    if (authorizationEnabled && !result.isAllowed()) {
-      throw new AccessDeniedException("Insufficient permissions "
-          + result.toContextString());
-    }
-  }
-
-  /**
-   * Checks that the user has the given global or namespace permission.
-   * @param namespace
-   * @param permissions Actions being requested
-   */
-  public void requireNamespacePermission(String request, String namespace, 
TableName tableName,
-      Map<byte[], ? extends Collection<byte[]>> familyMap, Action... 
permissions)
-      throws IOException {
-    User user = getActiveUser();
-    AuthResult result = null;
-
-    for (Action permission : permissions) {
-      if (authManager.authorize(user, namespace, permission)) {
-        result = AuthResult.allow(request, "Namespace permission granted",
-            user, permission, namespace);
-        result.getParams().setTableName(tableName).setFamilies(familyMap);
-        break;
-      } else {
-        // rest of the world
-        result = AuthResult.deny(request, "Insufficient permissions", user,
-            permission, namespace);
-        result.getParams().setTableName(tableName).setFamilies(familyMap);
-      }
-    }
-    logResult(result);
-    if (authorizationEnabled && !result.isAllowed()) {
-      throw new AccessDeniedException("Insufficient permissions "
-          + result.toContextString());
-    }
-  }
-
-  /**
    * Returns <code>true</code> if the current user is allowed the given action
    * over at least one of the column qualifiers in the given column families.
    */
@@ -663,13 +429,13 @@ public class AccessController extends 
BaseMasterAndRegionObserver
           familyMap.entrySet()) {
         if (family.getValue() != null && !family.getValue().isEmpty()) {
           for (byte[] qualifier : family.getValue()) {
-            if (authManager.matchPermission(user, tableName,
+            if (getAuthManager().matchPermission(user, tableName,
                 family.getKey(), qualifier, perm)) {
               return true;
             }
           }
         } else {
-          if (authManager.matchPermission(user, tableName, family.getKey(),
+          if (getAuthManager().matchPermission(user, tableName, 
family.getKey(),
               perm)) {
             return true;
           }
@@ -861,7 +627,7 @@ public class AccessController extends 
BaseMasterAndRegionObserver
           foundColumn = true;
           for (Action action: actions) {
             // Are there permissions for this user for the cell?
-            if (!authManager.authorize(user, getTableName(e), cell, action)) {
+            if (!getAuthManager().authorize(user, getTableName(e), cell, 
action)) {
               // We can stop if the cell ACL denies access
               return false;
             }
@@ -942,7 +708,7 @@ public class AccessController extends 
BaseMasterAndRegionObserver
     CompoundConfiguration conf = new CompoundConfiguration();
     conf.add(env.getConfiguration());
 
-    authorizationEnabled = isAuthorizationSupported(conf);
+    authorizationEnabled = AccessChecker.isAuthorizationSupported(conf);
     if (!authorizationEnabled) {
       LOG.warn("The AccessController has been loaded with authorization checks 
disabled.");
     }
@@ -976,26 +742,14 @@ public class AccessController extends 
BaseMasterAndRegionObserver
 
     // set the user-provider.
     this.userProvider = UserProvider.instantiate(env.getConfiguration());
-
-    // If zk is null or IOException while obtaining auth manager,
-    // throw RuntimeException so that the coprocessor is unloaded.
-    if (zk != null) {
-      try {
-        this.authManager = TableAuthManager.getOrCreate(zk, 
env.getConfiguration());
-      } catch (IOException ioe) {
-        throw new RuntimeException("Error obtaining TableAuthManager", ioe);
-      }
-    } else {
-      throw new RuntimeException("Error obtaining TableAuthManager, zk found 
null.");
-    }
-
+    accessChecker = new AccessChecker(env.getConfiguration(), zk);
     tableAcls = new MapMaker().weakValues().makeMap();
   }
 
   @Override
   public void stop(CoprocessorEnvironment env) {
-    if (this.authManager != null) {
-      TableAuthManager.release(authManager);
+    if (getAuthManager()!= null) {
+      TableAuthManager.release(getAuthManager());
     }
   }
 
@@ -1007,7 +761,7 @@ public class AccessController extends 
BaseMasterAndRegionObserver
     for (byte[] family: families) {
       familyMap.put(family, null);
     }
-    requireNamespacePermission("createTable", 
desc.getTableName().getNamespaceAsString(),
+    accessChecker.requireNamespacePermission(getActiveUser(),"createTable", 
desc.getTableName().getNamespaceAsString(),
         desc.getTableName(), familyMap, Action.CREATE);
   }
 
@@ -1059,7 +813,7 @@ public class AccessController extends 
BaseMasterAndRegionObserver
   @Override
   public void preDeleteTable(ObserverContext<MasterCoprocessorEnvironment> c, 
TableName tableName)
       throws IOException {
-    requirePermission("deleteTable", tableName, null, null, Action.ADMIN, 
Action.CREATE);
+    accessChecker.requirePermission(getActiveUser(),"deleteTable", tableName, 
null, null, Action.ADMIN, Action.CREATE);
   }
 
   @Override
@@ -1074,13 +828,13 @@ public class AccessController extends 
BaseMasterAndRegionObserver
         return null;
       }
     });
-    this.authManager.getZKPermissionWatcher().deleteTableACLNode(tableName);
+    getAuthManager().getZKPermissionWatcher().deleteTableACLNode(tableName);
   }
 
   @Override
   public void preTruncateTable(ObserverContext<MasterCoprocessorEnvironment> c,
       final TableName tableName) throws IOException {
-    requirePermission("truncateTable", tableName, null, null, Action.ADMIN, 
Action.CREATE);
+    accessChecker.requirePermission(getActiveUser(),"truncateTable", 
tableName, null, null, Action.ADMIN, Action.CREATE);
 
     final Configuration conf = c.getEnvironment().getConfiguration();
     User.runAsLoginUser(new PrivilegedExceptionAction<Void>() {
@@ -1118,7 +872,7 @@ public class AccessController extends 
BaseMasterAndRegionObserver
   @Override
   public void preModifyTable(ObserverContext<MasterCoprocessorEnvironment> c, 
TableName tableName,
       HTableDescriptor htd) throws IOException {
-    requirePermission("modifyTable", tableName, null, null, Action.ADMIN, 
Action.CREATE);
+    accessChecker.requirePermission(getActiveUser(),"modifyTable", tableName, 
null, null, Action.ADMIN, Action.CREATE);
   }
 
   @Override
@@ -1143,21 +897,21 @@ public class AccessController extends 
BaseMasterAndRegionObserver
   @Override
   public void preAddColumn(ObserverContext<MasterCoprocessorEnvironment> c, 
TableName tableName,
       HColumnDescriptor column) throws IOException {
-    requireTablePermission("addColumn", tableName, column.getName(), null, 
Action.ADMIN,
+    accessChecker.requireTablePermission(getActiveUser(),"addColumn", 
tableName, column.getName(), null, Action.ADMIN,
         Action.CREATE);
   }
 
   @Override
   public void preModifyColumn(ObserverContext<MasterCoprocessorEnvironment> c, 
TableName tableName,
       HColumnDescriptor descriptor) throws IOException {
-    requirePermission("modifyColumn", tableName, descriptor.getName(), null, 
Action.ADMIN,
+    accessChecker.requirePermission(getActiveUser(),"modifyColumn", tableName, 
descriptor.getName(), null, Action.ADMIN,
         Action.CREATE);
   }
 
   @Override
   public void preDeleteColumn(ObserverContext<MasterCoprocessorEnvironment> c, 
TableName tableName,
       byte[] col) throws IOException {
-    requirePermission("deleteColumn", tableName, col, null, Action.ADMIN, 
Action.CREATE);
+    accessChecker.requirePermission(getActiveUser(),"deleteColumn", tableName, 
col, null, Action.ADMIN, Action.CREATE);
   }
 
   @Override
@@ -1177,7 +931,7 @@ public class AccessController extends 
BaseMasterAndRegionObserver
   @Override
   public void preEnableTable(ObserverContext<MasterCoprocessorEnvironment> c, 
TableName tableName)
       throws IOException {
-    requirePermission("enableTable", tableName, null, null, Action.ADMIN, 
Action.CREATE);
+    accessChecker.requirePermission(getActiveUser(),"enableTable", tableName, 
null, null, Action.ADMIN, Action.CREATE);
   }
 
   @Override
@@ -1191,7 +945,7 @@ public class AccessController extends 
BaseMasterAndRegionObserver
       throw new AccessDeniedException("Not allowed to disable "
           + AccessControlLists.ACL_TABLE_NAME + " table with AccessController 
installed");
     }
-    requirePermission("disableTable", tableName, null, null, Action.ADMIN, 
Action.CREATE);
+    accessChecker.requirePermission(getActiveUser(),"disableTable", tableName, 
null, null, Action.ADMIN, Action.CREATE);
   }
 
   @Override
@@ -1202,7 +956,7 @@ public class AccessController extends 
BaseMasterAndRegionObserver
     if (!procEnv.isProcedureOwner(procId, getActiveUser())) {
       // If the user is not the procedure owner, then we should further probe 
whether
       // he can abort the procedure.
-      requirePermission("abortProcedure", Action.ADMIN);
+      accessChecker.requirePermission(getActiveUser(),"abortProcedure", 
Action.ADMIN);
     }
   }
 
@@ -1237,7 +991,7 @@ public class AccessController extends 
BaseMasterAndRegionObserver
         if (!ProcedureInfo.isProcedureOwner(procInfo, user)) {
           // If the user is not the procedure owner, then we should further 
probe whether
           // he can see the procedure.
-          requirePermission("listProcedures", Action.ADMIN);
+          accessChecker.requirePermission(getActiveUser(),"listProcedures", 
Action.ADMIN);
         }
       } catch (AccessDeniedException e) {
         itr.remove();
@@ -1248,31 +1002,31 @@ public class AccessController extends 
BaseMasterAndRegionObserver
   @Override
   public void preMove(ObserverContext<MasterCoprocessorEnvironment> c, 
HRegionInfo region,
       ServerName srcServer, ServerName destServer) throws IOException {
-    requirePermission("move", region.getTable(), null, null, Action.ADMIN);
+    accessChecker.requirePermission(getActiveUser(),"move", region.getTable(), 
null, null, Action.ADMIN);
   }
 
   @Override
   public void preAssign(ObserverContext<MasterCoprocessorEnvironment> c, 
HRegionInfo regionInfo)
       throws IOException {
-    requirePermission("assign", regionInfo.getTable(), null, null, 
Action.ADMIN);
+    accessChecker.requirePermission(getActiveUser(),"assign", 
regionInfo.getTable(), null, null, Action.ADMIN);
   }
 
   @Override
   public void preUnassign(ObserverContext<MasterCoprocessorEnvironment> c, 
HRegionInfo regionInfo,
       boolean force) throws IOException {
-    requirePermission("unassign", regionInfo.getTable(), null, null, 
Action.ADMIN);
+    accessChecker.requirePermission(getActiveUser(),"unassign", 
regionInfo.getTable(), null, null, Action.ADMIN);
   }
 
   @Override
   public void preRegionOffline(ObserverContext<MasterCoprocessorEnvironment> c,
       HRegionInfo regionInfo) throws IOException {
-    requirePermission("regionOffline", regionInfo.getTable(), null, null, 
Action.ADMIN);
+    accessChecker.requirePermission(getActiveUser(),"regionOffline", 
regionInfo.getTable(), null, null, Action.ADMIN);
   }
 
   @Override
   public boolean preSetSplitOrMergeEnabled(final 
ObserverContext<MasterCoprocessorEnvironment> ctx,
       final boolean newValue, final Admin.MasterSwitchType switchType) throws 
IOException {
-    requirePermission("setSplitOrMergeEnabled", Action.ADMIN);
+    accessChecker.requirePermission(getActiveUser(),"setSplitOrMergeEnabled", 
Action.ADMIN);
     return false;
   }
 
@@ -1284,26 +1038,26 @@ public class AccessController extends 
BaseMasterAndRegionObserver
   @Override
   public void preBalance(ObserverContext<MasterCoprocessorEnvironment> c)
       throws IOException {
-    requirePermission("balance", Action.ADMIN);
+    accessChecker.requirePermission(getActiveUser(),"balance", Action.ADMIN);
   }
 
   @Override
   public boolean 
preBalanceSwitch(ObserverContext<MasterCoprocessorEnvironment> c,
       boolean newValue) throws IOException {
-    requirePermission("balanceSwitch", Action.ADMIN);
+    accessChecker.requirePermission(getActiveUser(),"balanceSwitch", 
Action.ADMIN);
     return newValue;
   }
 
   @Override
   public void preShutdown(ObserverContext<MasterCoprocessorEnvironment> c)
       throws IOException {
-    requirePermission("shutdown", Action.ADMIN);
+    accessChecker.requirePermission(getActiveUser(),"shutdown", Action.ADMIN);
   }
 
   @Override
   public void preStopMaster(ObserverContext<MasterCoprocessorEnvironment> c)
       throws IOException {
-    requirePermission("stopMaster", Action.ADMIN);
+    accessChecker.requirePermission(getActiveUser(),"stopMaster", 
Action.ADMIN);
   }
 
   @Override
@@ -1322,7 +1076,7 @@ public class AccessController extends 
BaseMasterAndRegionObserver
   public void preSnapshot(final ObserverContext<MasterCoprocessorEnvironment> 
ctx,
       final SnapshotDescription snapshot, final HTableDescriptor 
hTableDescriptor)
       throws IOException {
-    requirePermission("snapshot " + snapshot.getName(), 
hTableDescriptor.getTableName(), null, null,
+    accessChecker.requirePermission(getActiveUser(),"snapshot " + 
snapshot.getName(), hTableDescriptor.getTableName(), null, null,
       Permission.Action.ADMIN);
   }
 
@@ -1334,9 +1088,9 @@ public class AccessController extends 
BaseMasterAndRegionObserver
       // list it, if user is the owner of snapshot
       AuthResult result = AuthResult.allow("listSnapshot " + 
snapshot.getName(),
           "Snapshot owner check allowed", user, null, null, null);
-      logResult(result);
+      accessChecker.logResult(result);
     } else {
-      requirePermission("listSnapshot " + snapshot.getName(), Action.ADMIN);
+      accessChecker.requirePermission(getActiveUser(),"listSnapshot " + 
snapshot.getName(), Action.ADMIN);
     }
   }
 
@@ -1350,9 +1104,9 @@ public class AccessController extends 
BaseMasterAndRegionObserver
       // Snapshot owner is allowed to create a table with the same name as the 
snapshot he took
       AuthResult result = AuthResult.allow("cloneSnapshot " + 
snapshot.getName(),
         "Snapshot owner check allowed", user, null, 
hTableDescriptor.getTableName(), null);
-      logResult(result);
+      accessChecker.logResult(result);
     } else {
-      requirePermission("cloneSnapshot " + snapshot.getName(), Action.ADMIN);
+      accessChecker.requirePermission(getActiveUser(),"cloneSnapshot " + 
snapshot.getName(), Action.ADMIN);
     }
   }
 
@@ -1361,10 +1115,10 @@ public class AccessController extends 
BaseMasterAndRegionObserver
       final SnapshotDescription snapshot, final HTableDescriptor 
hTableDescriptor)
       throws IOException {
     if (SnapshotDescriptionUtils.isSnapshotOwner(snapshot, getActiveUser())) {
-      requirePermission("restoreSnapshot " + snapshot.getName(), 
hTableDescriptor.getTableName(), null, null,
+      accessChecker.requirePermission(getActiveUser(),"restoreSnapshot " + 
snapshot.getName(), hTableDescriptor.getTableName(), null, null,
         Permission.Action.ADMIN);
     } else {
-      requirePermission("restoreSnapshot " + snapshot.getName(), Action.ADMIN);
+      accessChecker.requirePermission(getActiveUser(),"restoreSnapshot " + 
snapshot.getName(), Action.ADMIN);
     }
   }
 
@@ -1376,22 +1130,22 @@ public class AccessController extends 
BaseMasterAndRegionObserver
       // Snapshot owner is allowed to delete the snapshot
       AuthResult result = AuthResult.allow("deleteSnapshot " + 
snapshot.getName(),
           "Snapshot owner check allowed", user, null, null, null);
-      logResult(result);
+      accessChecker.logResult(result);
     } else {
-      requirePermission("deleteSnapshot " + snapshot.getName(), Action.ADMIN);
+      accessChecker.requirePermission(getActiveUser(),"deleteSnapshot " + 
snapshot.getName(), Action.ADMIN);
     }
   }
 
   @Override
   public void preCreateNamespace(ObserverContext<MasterCoprocessorEnvironment> 
ctx,
       NamespaceDescriptor ns) throws IOException {
-    requireGlobalPermission("createNamespace", Action.ADMIN, ns.getName());
+    accessChecker.requireGlobalPermission(getActiveUser(),"createNamespace", 
Action.ADMIN, ns.getName());
   }
 
   @Override
   public void preDeleteNamespace(ObserverContext<MasterCoprocessorEnvironment> 
ctx, String namespace)
       throws IOException {
-    requireGlobalPermission("deleteNamespace", Action.ADMIN, namespace);
+    accessChecker.requireGlobalPermission(getActiveUser(),"deleteNamespace", 
Action.ADMIN, namespace);
   }
 
   @Override
@@ -1406,7 +1160,7 @@ public class AccessController extends 
BaseMasterAndRegionObserver
         return null;
       }
     });
-    
this.authManager.getZKPermissionWatcher().deleteNamespaceACLNode(namespace);
+    
getAuthManager().getZKPermissionWatcher().deleteNamespaceACLNode(namespace);
     LOG.info(namespace + " entry deleted in " + 
AccessControlLists.ACL_TABLE_NAME + " table.");
   }
 
@@ -1415,13 +1169,13 @@ public class AccessController extends 
BaseMasterAndRegionObserver
       NamespaceDescriptor ns) throws IOException {
     // We require only global permission so that
     // a user with NS admin cannot altering namespace configurations. i.e. 
namespace quota
-    requireGlobalPermission("modifyNamespace", Action.ADMIN, ns.getName());
+    accessChecker.requireGlobalPermission(getActiveUser(),"modifyNamespace", 
Action.ADMIN, ns.getName());
   }
 
   @Override
   public void 
preGetNamespaceDescriptor(ObserverContext<MasterCoprocessorEnvironment> ctx, 
String namespace)
       throws IOException {
-    requireNamespacePermission("getNamespaceDescriptor", namespace, 
Action.ADMIN);
+    
accessChecker.requireNamespacePermission(getActiveUser(),"getNamespaceDescriptor",
 namespace, Action.ADMIN);
   }
 
   @Override
@@ -1433,7 +1187,7 @@ public class AccessController extends 
BaseMasterAndRegionObserver
     while (itr.hasNext()) {
       NamespaceDescriptor desc = itr.next();
       try {
-        requireNamespacePermission("listNamespaces", desc.getName(), 
Action.ADMIN);
+        
accessChecker.requireNamespacePermission(getActiveUser(),"listNamespaces", 
desc.getName(), Action.ADMIN);
       } catch (AccessDeniedException e) {
         itr.remove();
       }
@@ -1443,7 +1197,7 @@ public class AccessController extends 
BaseMasterAndRegionObserver
   @Override
   public void preTableFlush(final 
ObserverContext<MasterCoprocessorEnvironment> ctx,
       final TableName tableName) throws IOException {
-    requirePermission("flushTable", tableName, null, null, Action.ADMIN, 
Action.CREATE);
+    accessChecker.requirePermission(getActiveUser(),"flushTable", tableName, 
null, null, Action.ADMIN, Action.CREATE);
   }
 
   /* ---- RegionObserver implementation ---- */
@@ -1460,7 +1214,7 @@ public class AccessController extends 
BaseMasterAndRegionObserver
       if (regionInfo.getTable().isSystemTable()) {
         checkSystemOrSuperUser();
       } else {
-        requirePermission("preOpen", Action.ADMIN);
+        accessChecker.requirePermission(getActiveUser(),"preOpen", 
Action.ADMIN);
       }
     }
   }
@@ -1505,26 +1259,26 @@ public class AccessController extends 
BaseMasterAndRegionObserver
 
   @Override
   public void preFlush(ObserverContext<RegionCoprocessorEnvironment> e) throws 
IOException {
-    requirePermission("flush", getTableName(e.getEnvironment()), null, null, 
Action.ADMIN,
+    accessChecker.requirePermission(getActiveUser(),"flush", 
getTableName(e.getEnvironment()), null, null, Action.ADMIN,
         Action.CREATE);
   }
 
   @Override
   public void preSplit(ObserverContext<RegionCoprocessorEnvironment> e) throws 
IOException {
-    requirePermission("split", getTableName(e.getEnvironment()), null, null, 
Action.ADMIN);
+    accessChecker.requirePermission(getActiveUser(),"split", 
getTableName(e.getEnvironment()), null, null, Action.ADMIN);
   }
 
   @Override
   public void preSplit(ObserverContext<RegionCoprocessorEnvironment> e,
       byte[] splitRow) throws IOException {
-    requirePermission("split", getTableName(e.getEnvironment()), null, null, 
Action.ADMIN);
+    accessChecker.requirePermission(getActiveUser(),"split", 
getTableName(e.getEnvironment()), null, null, Action.ADMIN);
   }
 
   @Override
   public InternalScanner 
preCompact(ObserverContext<RegionCoprocessorEnvironment> e,
       final Store store, final InternalScanner scanner, final ScanType 
scanType)
           throws IOException {
-    requirePermission("compact", getTableName(e.getEnvironment()), null, null, 
Action.ADMIN,
+    accessChecker.requirePermission(getActiveUser(),"compact", 
getTableName(e.getEnvironment()), null, null, Action.ADMIN,
         Action.CREATE);
     return scanner;
   }
@@ -1544,7 +1298,7 @@ public class AccessController extends 
BaseMasterAndRegionObserver
         families, HConstants.LATEST_TIMESTAMP, Action.READ));
       authResult.setReason("Covering cell set");
     }
-    logResult(authResult);
+    accessChecker.logResult(authResult);
     if (authorizationEnabled && !authResult.isAllowed()) {
       throw new AccessDeniedException("Insufficient permissions " +
         authResult.toContextString());
@@ -1594,7 +1348,7 @@ public class AccessController extends 
BaseMasterAndRegionObserver
           authResult.setReason("Access allowed with filter");
           // Only wrap the filter if we are enforcing authorizations
           if (authorizationEnabled) {
-            Filter ourFilter = new AccessControlFilter(authManager, user, 
table,
+            Filter ourFilter = new AccessControlFilter(getAuthManager(), user, 
table,
               AccessControlFilter.Strategy.CHECK_TABLE_AND_CF_ONLY,
               cfVsMaxVersions);
             // wrap any existing filter
@@ -1624,7 +1378,7 @@ public class AccessController extends 
BaseMasterAndRegionObserver
         authResult.setReason("Access allowed with filter");
         // Only wrap the filter if we are enforcing authorizations
         if (authorizationEnabled) {
-          Filter ourFilter = new AccessControlFilter(authManager, user, table,
+          Filter ourFilter = new AccessControlFilter(getAuthManager(), user, 
table,
             AccessControlFilter.Strategy.CHECK_CELL_DEFAULT, cfVsMaxVersions);
           // wrap any existing filter
           if (filter != null) {
@@ -1646,7 +1400,7 @@ public class AccessController extends 
BaseMasterAndRegionObserver
       }
     }
 
-    logResult(authResult);
+    accessChecker.logResult(authResult);
     if (authorizationEnabled && !authResult.isAllowed()) {
       throw new AccessDeniedException("Insufficient permissions for user '"
           + (user != null ? user.getShortName() : "null")
@@ -1683,7 +1437,7 @@ public class AccessController extends 
BaseMasterAndRegionObserver
     RegionCoprocessorEnvironment env = c.getEnvironment();
     Map<byte[],? extends Collection<Cell>> families = put.getFamilyCellMap();
     AuthResult authResult = permissionGranted(OpType.PUT, user, env, families, 
Action.WRITE);
-    logResult(authResult);
+    accessChecker.logResult(authResult);
     if (!authResult.isAllowed()) {
       if (cellFeaturesEnabled && !compatibleEarlyTermination) {
         put.setAttribute(CHECK_COVERING_PERM, TRUE);
@@ -1728,7 +1482,7 @@ public class AccessController extends 
BaseMasterAndRegionObserver
     Map<byte[],? extends Collection<Cell>> families = 
delete.getFamilyCellMap();
     User user = getActiveUser();
     AuthResult authResult = permissionGranted(OpType.DELETE, user, env, 
families, Action.WRITE);
-    logResult(authResult);
+    accessChecker.logResult(authResult);
     if (!authResult.isAllowed()) {
       if (cellFeaturesEnabled && !compatibleEarlyTermination) {
         delete.setAttribute(CHECK_COVERING_PERM, TRUE);
@@ -1765,7 +1519,7 @@ public class AccessController extends 
BaseMasterAndRegionObserver
             authResult = AuthResult.deny(opType.toString(), "Covering cell 
set",
               getActiveUser(), Action.WRITE, table, m.getFamilyCellMap());
           }
-          logResult(authResult);
+          accessChecker.logResult(authResult);
           if (authorizationEnabled && !authResult.isAllowed()) {
             throw new AccessDeniedException("Insufficient permissions "
               + authResult.toContextString());
@@ -1798,7 +1552,7 @@ public class AccessController extends 
BaseMasterAndRegionObserver
     Map<byte[],? extends Collection<byte[]>> families = makeFamilyMap(family, 
qualifier);
     AuthResult authResult = permissionGranted(OpType.CHECK_AND_PUT, user, env, 
families,
       Action.READ, Action.WRITE);
-    logResult(authResult);
+    accessChecker.logResult(authResult);
     if (!authResult.isAllowed()) {
       if (cellFeaturesEnabled && !compatibleEarlyTermination) {
         put.setAttribute(CHECK_COVERING_PERM, TRUE);
@@ -1838,7 +1592,7 @@ public class AccessController extends 
BaseMasterAndRegionObserver
         authResult = AuthResult.deny(OpType.CHECK_AND_PUT.toString(), 
"Covering cell set",
             getActiveUser(), Action.READ, table, families);
       }
-      logResult(authResult);
+      accessChecker.logResult(authResult);
       if (authorizationEnabled && !authResult.isAllowed()) {
         throw new AccessDeniedException("Insufficient permissions " + 
authResult.toContextString());
       }
@@ -1864,7 +1618,7 @@ public class AccessController extends 
BaseMasterAndRegionObserver
     User user = getActiveUser();
     AuthResult authResult = permissionGranted(OpType.CHECK_AND_DELETE, user, 
env, families,
         Action.READ, Action.WRITE);
-    logResult(authResult);
+    accessChecker.logResult(authResult);
     if (!authResult.isAllowed()) {
       if (cellFeaturesEnabled && !compatibleEarlyTermination) {
         delete.setAttribute(CHECK_COVERING_PERM, TRUE);
@@ -1896,7 +1650,7 @@ public class AccessController extends 
BaseMasterAndRegionObserver
         authResult = AuthResult.deny(OpType.CHECK_AND_DELETE.toString(), 
"Covering cell set",
             getActiveUser(), Action.READ, table, families);
       }
-      logResult(authResult);
+      accessChecker.logResult(authResult);
       if (authorizationEnabled && !authResult.isAllowed()) {
         throw new AccessDeniedException("Insufficient permissions " + 
authResult.toContextString());
       }
@@ -1921,7 +1675,7 @@ public class AccessController extends 
BaseMasterAndRegionObserver
         families, HConstants.LATEST_TIMESTAMP, Action.WRITE));
       authResult.setReason("Covering cell set");
     }
-    logResult(authResult);
+    accessChecker.logResult(authResult);
     if (authorizationEnabled && !authResult.isAllowed()) {
       throw new AccessDeniedException("Insufficient permissions " + 
authResult.toContextString());
     }
@@ -1938,7 +1692,7 @@ public class AccessController extends 
BaseMasterAndRegionObserver
     RegionCoprocessorEnvironment env = c.getEnvironment();
     Map<byte[],? extends Collection<Cell>> families = 
append.getFamilyCellMap();
     AuthResult authResult = permissionGranted(OpType.APPEND, user, env, 
families, Action.WRITE);
-    logResult(authResult);
+    accessChecker.logResult(authResult);
     if (!authResult.isAllowed()) {
       if (cellFeaturesEnabled && !compatibleEarlyTermination) {
         append.setAttribute(CHECK_COVERING_PERM, TRUE);
@@ -1976,7 +1730,7 @@ public class AccessController extends 
BaseMasterAndRegionObserver
         authResult = AuthResult.deny(OpType.APPEND.toString(), "Covering cell 
set",
             getActiveUser(), Action.WRITE, table, append.getFamilyCellMap());
       }
-      logResult(authResult);
+      accessChecker.logResult(authResult);
       if (authorizationEnabled && !authResult.isAllowed()) {
         throw new AccessDeniedException("Insufficient permissions " +
           authResult.toContextString());
@@ -1998,7 +1752,7 @@ public class AccessController extends 
BaseMasterAndRegionObserver
     Map<byte[],? extends Collection<Cell>> families = 
increment.getFamilyCellMap();
     AuthResult authResult = permissionGranted(OpType.INCREMENT, user, env, 
families,
       Action.WRITE);
-    logResult(authResult);
+    accessChecker.logResult(authResult);
     if (!authResult.isAllowed()) {
       if (cellFeaturesEnabled && !compatibleEarlyTermination) {
         increment.setAttribute(CHECK_COVERING_PERM, TRUE);
@@ -2036,7 +1790,7 @@ public class AccessController extends 
BaseMasterAndRegionObserver
         authResult = AuthResult.deny(OpType.INCREMENT.toString(), "Covering 
cell set",
             getActiveUser(), Action.WRITE, table, 
increment.getFamilyCellMap());
       }
-      logResult(authResult);
+      accessChecker.logResult(authResult);
       if (authorizationEnabled && !authResult.isAllowed()) {
         throw new AccessDeniedException("Insufficient permissions " +
           authResult.toContextString());
@@ -2178,7 +1932,7 @@ public class AccessController extends 
BaseMasterAndRegionObserver
   public void preBulkLoadHFile(ObserverContext<RegionCoprocessorEnvironment> 
ctx,
       List<Pair<byte[], String>> familyPaths) throws IOException {
     for(Pair<byte[],String> el : familyPaths) {
-      requirePermission("preBulkLoadHFile",
+      accessChecker.requirePermission(getActiveUser(),"preBulkLoadHFile",
           ctx.getEnvironment().getRegion().getTableDesc().getTableName(),
           el.getFirst(),
           null,
@@ -2196,7 +1950,7 @@ public class AccessController extends 
BaseMasterAndRegionObserver
   @Override
   public void prePrepareBulkLoad(ObserverContext<RegionCoprocessorEnvironment> 
ctx,
                                  PrepareBulkLoadRequest request) throws 
IOException {
-    requireAccess("prePareBulkLoad",
+    accessChecker.requireAccess(getActiveUser(),"prePareBulkLoad",
         ctx.getEnvironment().getRegion().getTableDesc().getTableName(), 
Action.CREATE);
   }
 
@@ -2210,7 +1964,7 @@ public class AccessController extends 
BaseMasterAndRegionObserver
   @Override
   public void preCleanupBulkLoad(ObserverContext<RegionCoprocessorEnvironment> 
ctx,
                                  CleanupBulkLoadRequest request) throws 
IOException {
-    requireAccess("preCleanupBulkLoad",
+    accessChecker.requireAccess(getActiveUser(),"preCleanupBulkLoad",
         ctx.getEnvironment().getRegion().getTableDesc().getTableName(), 
Action.CREATE);
   }
 
@@ -2222,7 +1976,7 @@ public class AccessController extends 
BaseMasterAndRegionObserver
     // Don't intercept calls to our own AccessControlService, we check for
     // appropriate permissions in the service handlers
     if (shouldCheckExecPermission && !(service instanceof 
AccessControlService)) {
-      requirePermission("invoke(" + service.getDescriptorForType().getName() + 
"." +
+      accessChecker.requirePermission(getActiveUser(),"invoke(" + 
service.getDescriptorForType().getName() + "." +
         methodName + ")",
         getTableName(ctx.getEnvironment()), null, null,
         Action.EXEC);
@@ -2256,11 +2010,11 @@ public class AccessController extends 
BaseMasterAndRegionObserver
         switch(request.getUserPermission().getPermission().getType()) {
           case Global :
           case Table :
-            requirePermission("grant", perm.getTableName(), perm.getFamily(),
+            accessChecker.requirePermission(getActiveUser(),"grant", 
perm.getTableName(), perm.getFamily(),
               perm.getQualifier(), Action.ADMIN);
             break;
           case Namespace :
-            requireNamespacePermission("grant", perm.getNamespace(), 
Action.ADMIN);
+            accessChecker.requireNamespacePermission(getActiveUser(),"grant", 
perm.getNamespace(), Action.ADMIN);
            break;
         }
 
@@ -2309,11 +2063,11 @@ public class AccessController extends 
BaseMasterAndRegionObserver
         switch(request.getUserPermission().getPermission().getType()) {
           case Global :
           case Table :
-            requirePermission("revoke", perm.getTableName(), perm.getFamily(),
+            accessChecker.requirePermission(getActiveUser(),"revoke", 
perm.getTableName(), perm.getFamily(),
               perm.getQualifier(), Action.ADMIN);
             break;
           case Namespace :
-            requireNamespacePermission("revoke", perm.getNamespace(), 
Action.ADMIN);
+            accessChecker.requireNamespacePermission(getActiveUser(),"revoke", 
perm.getNamespace(), Action.ADMIN);
             break;
         }
 
@@ -2356,7 +2110,7 @@ public class AccessController extends 
BaseMasterAndRegionObserver
         if (request.getType() == AccessControlProtos.Permission.Type.Table) {
           final TableName table = request.hasTableName() ?
             ProtobufUtil.toTableName(request.getTableName()) : null;
-          requirePermission("userPermissions", table, null, null, 
Action.ADMIN);
+          accessChecker.requirePermission(getActiveUser(),"userPermissions", 
table, null, null, Action.ADMIN);
           perms = User.runAsLoginUser(new 
PrivilegedExceptionAction<List<UserPermission>>() {
             @Override
             public List<UserPermission> run() throws Exception {
@@ -2365,7 +2119,7 @@ public class AccessController extends 
BaseMasterAndRegionObserver
           });
         } else if (request.getType() == 
AccessControlProtos.Permission.Type.Namespace) {
           final String namespace = request.getNamespaceName().toStringUtf8();
-          requireNamespacePermission("userPermissions", namespace, 
Action.ADMIN);
+          
accessChecker.requireNamespacePermission(getActiveUser(),"userPermissions", 
namespace, Action.ADMIN);
           perms = User.runAsLoginUser(new 
PrivilegedExceptionAction<List<UserPermission>>() {
             @Override
             public List<UserPermission> run() throws Exception {
@@ -2374,7 +2128,7 @@ public class AccessController extends 
BaseMasterAndRegionObserver
             }
           });
         } else {
-          requirePermission("userPermissions", Action.ADMIN);
+          accessChecker.requirePermission(getActiveUser(),"userPermissions", 
Action.ADMIN);
           perms = User.runAsLoginUser(new 
PrivilegedExceptionAction<List<UserPermission>>() {
             @Override
             public List<UserPermission> run() throws Exception {
@@ -2440,7 +2194,7 @@ public class AccessController extends 
BaseMasterAndRegionObserver
 
             AuthResult result = permissionGranted("checkPermissions", user, 
action, regionEnv,
               familyMap);
-            logResult(result);
+            accessChecker.logResult(result);
             if (!result.isAllowed()) {
               // Even if passive we need to throw an exception here, we 
support checking
               // effective permissions, so throw unconditionally
@@ -2455,14 +2209,14 @@ public class AccessController extends 
BaseMasterAndRegionObserver
 
           for (Action action : permission.getActions()) {
             AuthResult result;
-            if (authManager.authorize(user, action)) {
+            if (getAuthManager().authorize(user, action)) {
               result = AuthResult.allow("checkPermissions", "Global action 
allowed", user,
                 action, null, null);
             } else {
               result = AuthResult.deny("checkPermissions", "Global action 
denied", user, action,
                 null, null);
             }
-            logResult(result);
+            accessChecker.logResult(result);
             if (!result.isAllowed()) {
               // Even if passive we need to throw an exception here, we 
support checking
               // effective permissions, so throw unconditionally
@@ -2507,7 +2261,7 @@ public class AccessController extends 
BaseMasterAndRegionObserver
   @Override
   public void preClose(ObserverContext<RegionCoprocessorEnvironment> e, 
boolean abortRequested)
       throws IOException {
-    requirePermission("preClose", Action.ADMIN);
+    accessChecker.requirePermission(getActiveUser(),"preClose", Action.ADMIN);
   }
 
   private void checkSystemOrSuperUser() throws IOException {
@@ -2526,7 +2280,7 @@ public class AccessController extends 
BaseMasterAndRegionObserver
   public void preStopRegionServer(
       ObserverContext<RegionServerCoprocessorEnvironment> env)
       throws IOException {
-    requirePermission("preStopRegionServer", Action.ADMIN);
+    accessChecker.requirePermission(getActiveUser(),"preStopRegionServer", 
Action.ADMIN);
   }
 
   private Map<byte[], ? extends Collection<byte[]>> makeFamilyMap(byte[] 
family,
@@ -2555,7 +2309,7 @@ public class AccessController extends 
BaseMasterAndRegionObserver
         if (masterServices.getTableDescriptors().get(tableName) == null) {
           continue;
         }
-        requirePermission("getTableDescriptors", tableName, null, null,
+        accessChecker.requirePermission(getActiveUser(),"getTableDescriptors", 
tableName, null, null,
             Action.ADMIN, Action.CREATE);
       }
     }
@@ -2576,7 +2330,7 @@ public class AccessController extends 
BaseMasterAndRegionObserver
     while (itr.hasNext()) {
       HTableDescriptor htd = itr.next();
       try {
-        requirePermission("getTableDescriptors", htd.getTableName(), null, 
null,
+        accessChecker.requirePermission(getActiveUser(),"getTableDescriptors", 
htd.getTableName(), null, null,
             Action.ADMIN, Action.CREATE);
       } catch (AccessDeniedException e) {
         itr.remove();
@@ -2592,7 +2346,7 @@ public class AccessController extends 
BaseMasterAndRegionObserver
     while (itr.hasNext()) {
       HTableDescriptor htd = itr.next();
       try {
-        requireAccess("getTableNames", htd.getTableName(), Action.values());
+        accessChecker.requireAccess(getActiveUser(),"getTableNames", 
htd.getTableName(), Action.values());
       } catch (AccessDeniedException e) {
         itr.remove();
       }
@@ -2602,14 +2356,14 @@ public class AccessController extends 
BaseMasterAndRegionObserver
   @Override
   public void preDispatchMerge(final 
ObserverContext<MasterCoprocessorEnvironment> ctx,
       HRegionInfo regionA, HRegionInfo regionB) throws IOException {
-    requirePermission("mergeRegions", regionA.getTable(), null, null,
+    accessChecker.requirePermission(getActiveUser(),"mergeRegions", 
regionA.getTable(), null, null,
       Action.ADMIN);
   }
 
   @Override
   public void 
preClearDeadServers(ObserverContext<MasterCoprocessorEnvironment> ctx)
       throws IOException {
-    requirePermission("clearDeadServers", Action.ADMIN);
+    accessChecker.requirePermission(getActiveUser(),"clearDeadServers", 
Action.ADMIN);
   }
 
   @Override
@@ -2619,7 +2373,7 @@ public class AccessController extends 
BaseMasterAndRegionObserver
   @Override
   public void preMerge(ObserverContext<RegionServerCoprocessorEnvironment> 
ctx, Region regionA,
       Region regionB) throws IOException {
-    requirePermission("mergeRegions", regionA.getTableDesc().getTableName(), 
null, null,
+    accessChecker.requirePermission(getActiveUser(),"mergeRegions", 
regionA.getTableDesc().getTableName(), null, null,
       Action.ADMIN);
   }
 
@@ -2646,7 +2400,7 @@ public class AccessController extends 
BaseMasterAndRegionObserver
   @Override
   public void 
preRollWALWriterRequest(ObserverContext<RegionServerCoprocessorEnvironment> ctx)
       throws IOException {
-    requirePermission("preRollLogWriterRequest", Permission.Action.ADMIN);
+    accessChecker.requirePermission(getActiveUser(),"preRollLogWriterRequest", 
Permission.Action.ADMIN);
   }
 
   @Override
@@ -2662,7 +2416,7 @@ public class AccessController extends 
BaseMasterAndRegionObserver
   @Override
   public void 
preReplicateLogEntries(ObserverContext<RegionServerCoprocessorEnvironment> ctx,
       List<WALEntry> entries, CellScanner cells) throws IOException {
-    requirePermission("replicateLogEntries", Action.WRITE);
+    accessChecker.requirePermission(getActiveUser(),"replicateLogEntries", 
Action.WRITE);
   }
 
   @Override
@@ -2673,72 +2427,65 @@ public class AccessController extends 
BaseMasterAndRegionObserver
   @Override
   public void preSetUserQuota(final 
ObserverContext<MasterCoprocessorEnvironment> ctx,
       final String userName, final Quotas quotas) throws IOException {
-    requirePermission("setUserQuota", Action.ADMIN);
+    accessChecker.requirePermission(getActiveUser(),"setUserQuota", 
Action.ADMIN);
   }
 
   @Override
   public void preSetUserQuota(final 
ObserverContext<MasterCoprocessorEnvironment> ctx,
       final String userName, final TableName tableName, final Quotas quotas) 
throws IOException {
-    requirePermission("setUserTableQuota", tableName, null, null, 
Action.ADMIN);
+    accessChecker.requirePermission(getActiveUser(),"setUserTableQuota", 
tableName, null, null, Action.ADMIN);
   }
 
   @Override
   public void preSetUserQuota(final 
ObserverContext<MasterCoprocessorEnvironment> ctx,
       final String userName, final String namespace, final Quotas quotas) 
throws IOException {
-    requirePermission("setUserNamespaceQuota", Action.ADMIN);
+    accessChecker.requirePermission(getActiveUser(),"setUserNamespaceQuota", 
Action.ADMIN);
   }
 
   @Override
   public void preSetTableQuota(final 
ObserverContext<MasterCoprocessorEnvironment> ctx,
       final TableName tableName, final Quotas quotas) throws IOException {
-    requirePermission("setTableQuota", tableName, null, null, Action.ADMIN);
+    accessChecker.requirePermission(getActiveUser(),"setTableQuota", 
tableName, null, null, Action.ADMIN);
   }
 
   @Override
   public void preSetNamespaceQuota(final 
ObserverContext<MasterCoprocessorEnvironment> ctx,
       final String namespace, final Quotas quotas) throws IOException {
-    requirePermission("setNamespaceQuota", Action.ADMIN);
+    accessChecker.requirePermission(getActiveUser(),"setNamespaceQuota", 
Action.ADMIN);
   }
 
   @Override
   public void 
preMoveServersAndTables(ObserverContext<MasterCoprocessorEnvironment> ctx,
       Set<Address> servers, Set<TableName> tables, String targetGroup) throws 
IOException {
-    requirePermission("moveServersAndTables", Action.ADMIN);
   }
 
   @Override
   public void preMoveServers(ObserverContext<MasterCoprocessorEnvironment> ctx,
       Set<Address> servers, String targetGroup) throws IOException {
-    requirePermission("moveServers", Action.ADMIN);
   }
 
   @Override
   public void preMoveTables(ObserverContext<MasterCoprocessorEnvironment> ctx,
       Set<TableName> tables, String targetGroup) throws IOException {
-    requirePermission("moveTables", Action.ADMIN);
   }
 
   @Override
   public void preRemoveServers(ObserverContext<MasterCoprocessorEnvironment> 
ctx,
       Set<Address> servers) throws IOException {
-    requirePermission("removeServers", Action.ADMIN);
   }
 
   @Override
   public void preAddRSGroup(ObserverContext<MasterCoprocessorEnvironment> ctx,
       String name) throws IOException {
-    requirePermission("addRSGroup", Action.ADMIN);
   }
 
   @Override
   public void preRemoveRSGroup(ObserverContext<MasterCoprocessorEnvironment> 
ctx,
       String name) throws IOException {
-    requirePermission("removeRSGroup", Action.ADMIN);
   }
 
   @Override
   public void preBalanceRSGroup(ObserverContext<MasterCoprocessorEnvironment> 
ctx,
       String groupName) throws IOException {
-    requirePermission("balanceRSGroup", Action.ADMIN);
   }
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/6f1dd258/hbase-server/src/main/java/org/apache/hadoop/hbase/security/access/TableAuthManager.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/security/access/TableAuthManager.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/security/access/TableAuthManager.java
index dd1059c..a12757d 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/security/access/TableAuthManager.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/security/access/TableAuthManager.java
@@ -758,7 +758,7 @@ public class TableAuthManager implements Closeable {
   }
 
   @VisibleForTesting
-  static int getTotalRefCount() {
+  public static int getTotalRefCount() {
     int total = 0;
     for (int count : refCount.values()) {
       total += count;

http://git-wip-us.apache.org/repos/asf/hbase/blob/6f1dd258/hbase-server/src/main/java/org/apache/hadoop/hbase/security/visibility/VisibilityController.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/security/visibility/VisibilityController.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/security/visibility/VisibilityController.java
index 0fa3207..9692ecd 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/security/visibility/VisibilityController.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/security/visibility/VisibilityController.java
@@ -101,6 +101,7 @@ import 
org.apache.hadoop.hbase.replication.ReplicationEndpoint;
 import org.apache.hadoop.hbase.security.AccessDeniedException;
 import org.apache.hadoop.hbase.security.Superusers;
 import org.apache.hadoop.hbase.security.User;
+import org.apache.hadoop.hbase.security.access.AccessChecker;
 import org.apache.hadoop.hbase.security.access.AccessController;
 import org.apache.hadoop.hbase.util.ByteStringer;
 import org.apache.hadoop.hbase.util.Bytes;
@@ -139,8 +140,8 @@ public class VisibilityController extends 
BaseMasterAndRegionObserver implements
 
   private VisibilityLabelService visibilityLabelService; // FindBugs: 
MT_CORRECTNESS FIX!!!
 
-  /** if we are active, usually true, only not true if 
"hbase.security.authorization"
-    has been set to false in site configuration */
+  /** if we are active, usually false, only true if 
"hbase.security.authorization"
+    has been set to true in site configuration */
   boolean authorizationEnabled;
 
   // Add to this list if there are any reserved tag types
@@ -151,19 +152,15 @@ public class VisibilityController extends 
BaseMasterAndRegionObserver implements
     RESERVED_VIS_TAG_TYPES.add(TagType.STRING_VIS_TAG_TYPE);
   }
 
-  public static boolean isAuthorizationSupported(Configuration conf) {
-    return conf.getBoolean(User.HBASE_SECURITY_AUTHORIZATION_CONF_KEY, true);
-  }
-
   public static boolean isCellAuthorizationSupported(Configuration conf) {
-    return isAuthorizationSupported(conf);
+    return AccessChecker.isAuthorizationSupported(conf);
   }
 
   @Override
   public void start(CoprocessorEnvironment env) throws IOException {
     this.conf = env.getConfiguration();
 
-    authorizationEnabled = isAuthorizationSupported(conf);
+    authorizationEnabled = AccessChecker.isAuthorizationSupported(conf);
     if (!authorizationEnabled) {
       LOG.warn("The VisibilityController has been loaded with authorization 
checks disabled.");
     }

http://git-wip-us.apache.org/repos/asf/hbase/blob/6f1dd258/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestImportTSVWithVisibilityLabels.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestImportTSVWithVisibilityLabels.java
 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestImportTSVWithVisibilityLabels.java
index 6426ec9..e00a31e 100644
--- 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestImportTSVWithVisibilityLabels.java
+++ 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestImportTSVWithVisibilityLabels.java
@@ -64,6 +64,7 @@ import 
org.apache.hadoop.hbase.security.visibility.SimpleScanLabelGenerator;
 import org.apache.hadoop.hbase.security.visibility.VisibilityClient;
 import org.apache.hadoop.hbase.security.visibility.VisibilityConstants;
 import org.apache.hadoop.hbase.security.visibility.VisibilityController;
+import org.apache.hadoop.hbase.security.visibility.VisibilityTestUtil;
 import org.apache.hadoop.hbase.security.visibility.VisibilityUtils;
 import org.apache.hadoop.hbase.testclassification.LargeTests;
 import org.apache.hadoop.hbase.util.Bytes;
@@ -117,9 +118,7 @@ public class TestImportTSVWithVisibilityLabels implements 
Configurable {
     conf = util.getConfiguration();
     SUPERUSER = User.createUserForTesting(conf, "admin", new String[] { 
"supergroup" });
     conf.set("hbase.superuser", "admin,"+User.getCurrent().getName());
-    conf.setInt("hfile.format.version", 3);
-    conf.set("hbase.coprocessor.master.classes", 
VisibilityController.class.getName());
-    conf.set("hbase.coprocessor.region.classes", 
VisibilityController.class.getName());
+    VisibilityTestUtil.enableVisiblityLabels(conf);
     conf.setClass(VisibilityUtils.VISIBILITY_LABEL_GENERATOR_CLASS, 
SimpleScanLabelGenerator.class,
         ScanLabelGenerator.class);
     util.setJobWithoutMRCluster();

http://git-wip-us.apache.org/repos/asf/hbase/blob/6f1dd258/hbase-server/src/test/java/org/apache/hadoop/hbase/security/access/SecureTestUtil.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/security/access/SecureTestUtil.java
 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/security/access/SecureTestUtil.java
index 2c18a82..7a26717 100644
--- 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/security/access/SecureTestUtil.java
+++ 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/security/access/SecureTestUtil.java
@@ -104,6 +104,7 @@ public class SecureTestUtil {
     conf.set(CoprocessorHost.REGIONSERVER_COPROCESSOR_CONF_KEY, 
AccessController.class.getName());
     // Need HFile V3 for tags for security features
     conf.setInt(HFile.FORMAT_VERSION_KEY, 3);
+    conf.setBoolean(User.HBASE_SECURITY_AUTHORIZATION_CONF_KEY, true);
     configureSuperuser(conf);
   }
 
@@ -127,6 +128,11 @@ public class SecureTestUtil {
     if (conf.getInt(HFile.FORMAT_VERSION_KEY, 2) < 
HFile.MIN_FORMAT_VERSION_WITH_TAGS) {
       throw new RuntimeException("Post 0.96 security features require HFile 
version >= 3");
     }
+
+    if (!conf.getBoolean(User.HBASE_SECURITY_AUTHORIZATION_CONF_KEY, false)) {
+      throw new RuntimeException("Post 1.5.0 security features require set "
+          + User.HBASE_SECURITY_AUTHORIZATION_CONF_KEY + " to true");
+    }
   }
 
   public static void checkTablePerms(Configuration conf, TableName table, 
byte[] family, byte[] column,

http://git-wip-us.apache.org/repos/asf/hbase/blob/6f1dd258/hbase-server/src/test/java/org/apache/hadoop/hbase/security/access/TestAccessController.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/security/access/TestAccessController.java
 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/security/access/TestAccessController.java
index 5071ca0..c1d6e8f 100644
--- 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/security/access/TestAccessController.java
+++ 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/security/access/TestAccessController.java
@@ -2967,81 +2967,6 @@ public class TestAccessController extends SecureTestUtil 
{
   }
 
   @Test
-  public void testMoveServers() throws Exception {
-    AccessTestAction action1 = new AccessTestAction() {
-      @Override
-      public Object run() throws Exception {
-        
ACCESS_CONTROLLER.preMoveServers(ObserverContext.createAndPrepare(CP_ENV, null),
-            null, null);
-        return null;
-      }
-    };
-
-    verifyAllowed(action1, SUPERUSER, USER_ADMIN);
-    verifyDenied(action1, USER_CREATE, USER_RW, USER_RO, USER_NONE, 
USER_OWNER);
-  }
-
-  @Test
-  public void testMoveTables() throws Exception {
-    AccessTestAction action1 = new AccessTestAction() {
-      @Override
-      public Object run() throws Exception {
-        
ACCESS_CONTROLLER.preMoveTables(ObserverContext.createAndPrepare(CP_ENV, null),
-            null, null);
-        return null;
-      }
-    };
-
-    verifyAllowed(action1, SUPERUSER, USER_ADMIN);
-    verifyDenied(action1, USER_CREATE, USER_RW, USER_RO, USER_NONE, 
USER_OWNER);
-  }
-
-  @Test
-  public void testAddGroup() throws Exception {
-    AccessTestAction action1 = new AccessTestAction() {
-      @Override
-      public Object run() throws Exception {
-        
ACCESS_CONTROLLER.preAddRSGroup(ObserverContext.createAndPrepare(CP_ENV, null),
-            null);
-        return null;
-      }
-    };
-
-    verifyAllowed(action1, SUPERUSER, USER_ADMIN);
-    verifyDenied(action1, USER_CREATE, USER_RW, USER_RO, USER_NONE, 
USER_OWNER);
-  }
-
-  @Test
-  public void testRemoveGroup() throws Exception {
-    AccessTestAction action1 = new AccessTestAction() {
-      @Override
-      public Object run() throws Exception {
-        
ACCESS_CONTROLLER.preRemoveRSGroup(ObserverContext.createAndPrepare(CP_ENV, 
null),
-            null);
-        return null;
-      }
-    };
-
-    verifyAllowed(action1, SUPERUSER, USER_ADMIN);
-    verifyDenied(action1, USER_CREATE, USER_RW, USER_RO, USER_NONE, 
USER_OWNER);
-  }
-
-  @Test
-  public void testBalanceGroup() throws Exception {
-    AccessTestAction action1 = new AccessTestAction() {
-      @Override
-      public Object run() throws Exception {
-        
ACCESS_CONTROLLER.preBalanceRSGroup(ObserverContext.createAndPrepare(CP_ENV, 
null),
-            null);
-        return null;
-      }
-    };
-
-    verifyAllowed(action1, SUPERUSER, USER_ADMIN);
-    verifyDenied(action1, USER_CREATE, USER_RW, USER_RO, USER_NONE, 
USER_OWNER);
-  }
-
-  @Test
   public void testGetClusterStatus() throws Exception {
     AccessTestAction action = new AccessTestAction() {
       @Override

http://git-wip-us.apache.org/repos/asf/hbase/blob/6f1dd258/hbase-server/src/test/java/org/apache/hadoop/hbase/security/visibility/VisibilityTestUtil.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/security/visibility/VisibilityTestUtil.java
 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/security/visibility/VisibilityTestUtil.java
index 7dbe256..4e2c4b7 100644
--- 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/security/visibility/VisibilityTestUtil.java
+++ 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/security/visibility/VisibilityTestUtil.java
@@ -14,7 +14,7 @@ import java.io.IOException;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.coprocessor.CoprocessorHost;
-
+import org.apache.hadoop.hbase.security.User;
 
 /**
  * Utility methods for testing visibility labels.
@@ -23,6 +23,7 @@ public class VisibilityTestUtil {
 
   public static void enableVisiblityLabels(Configuration conf) throws 
IOException {
     conf.setInt("hfile.format.version", 3);
+    conf.setBoolean(User.HBASE_SECURITY_AUTHORIZATION_CONF_KEY, true);
     appendCoprocessor(conf, CoprocessorHost.MASTER_COPROCESSOR_CONF_KEY,
       VisibilityController.class.getName());
     appendCoprocessor(conf, CoprocessorHost.REGION_COPROCESSOR_CONF_KEY,

Reply via email to