HBASE-19483 Add proper privilege check for rsgroup commands

Signed-off-by: tedyu <yuzhih...@gmail.com>


Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/6f1dd258
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/6f1dd258
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/6f1dd258

Branch: refs/heads/branch-1
Commit: 6f1dd258b17c3bed1b49a68bd7fd4530fbd1ca8c
Parents: e6453ab
Author: Guangxu Cheng <guangxuch...@gmail.com>
Authored: Wed Jan 10 14:14:11 2018 +0800
Committer: tedyu <yuzhih...@gmail.com>
Committed: Wed Jan 10 02:25:47 2018 -0800

----------------------------------------------------------------------
 ...tegrationTestIngestWithVisibilityLabels.java |   5 +-
 ...egrationTestBigLinkedListWithVisibility.java |   5 +-
 ...tionTestWithCellVisibilityLoadAndVerify.java |   5 +-
 .../hbase/rest/TestScannersWithLabels.java      |   5 +-
 .../hbase/rsgroup/RSGroupAdminEndpoint.java     |  50 +-
 .../hbase/rsgroup/TestRSGroupsWithACL.java      | 358 ++++++++++
 .../hadoop/hbase/master/MasterRpcServices.java  |   3 +-
 .../hbase/security/access/AccessChecker.java    | 300 ++++++++
 .../security/access/AccessControlLists.java     |   4 +-
 .../hbase/security/access/AccessController.java | 477 +++----------
 .../hbase/security/access/TableAuthManager.java |   2 +-
 .../visibility/VisibilityController.java        |  13 +-
 .../TestImportTSVWithVisibilityLabels.java      |   5 +-
 .../hbase/security/access/SecureTestUtil.java   |   6 +
 .../security/access/TestAccessController.java   |  75 --
 .../security/visibility/VisibilityTestUtil.java |   3 +-
 ...TestThriftHBaseServiceHandlerWithLabels.java | 707 +++++++++----------
 .../asciidoc/_chapters/appendix_acl_matrix.adoc |  12 +
 src/main/asciidoc/_chapters/security.adoc       |  16 +
 19 files changed, 1228 insertions(+), 823 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hbase/blob/6f1dd258/hbase-it/src/test/java/org/apache/hadoop/hbase/IntegrationTestIngestWithVisibilityLabels.java
----------------------------------------------------------------------
diff --git 
a/hbase-it/src/test/java/org/apache/hadoop/hbase/IntegrationTestIngestWithVisibilityLabels.java
 
b/hbase-it/src/test/java/org/apache/hadoop/hbase/IntegrationTestIngestWithVisibilityLabels.java
index b942918..6d2f783 100644
--- 
a/hbase-it/src/test/java/org/apache/hadoop/hbase/IntegrationTestIngestWithVisibilityLabels.java
+++ 
b/hbase-it/src/test/java/org/apache/hadoop/hbase/IntegrationTestIngestWithVisibilityLabels.java
@@ -28,6 +28,7 @@ import org.apache.hadoop.hbase.security.User;
 import 
org.apache.hadoop.hbase.security.visibility.LoadTestDataGeneratorWithVisibilityLabels;
 import org.apache.hadoop.hbase.security.visibility.VisibilityClient;
 import org.apache.hadoop.hbase.security.visibility.VisibilityController;
+import org.apache.hadoop.hbase.security.visibility.VisibilityTestUtil;
 import org.apache.hadoop.hbase.testclassification.IntegrationTests;
 import org.apache.hadoop.hbase.util.LoadTestTool;
 import org.junit.experimental.categories.Category;
@@ -76,9 +77,7 @@ public class IntegrationTestIngestWithVisibilityLabels 
extends IntegrationTestIn
   public void setUpCluster() throws Exception {
     util = getTestingUtil(null);
     Configuration conf = util.getConfiguration();
-    conf.setInt(HFile.FORMAT_VERSION_KEY, 3);
-    conf.set("hbase.coprocessor.master.classes", 
VisibilityController.class.getName());
-    conf.set("hbase.coprocessor.region.classes", 
VisibilityController.class.getName());
+    VisibilityTestUtil.enableVisiblityLabels(conf);
     conf.set("hbase.superuser", "admin," + User.getCurrent().getName());
     super.setUpCluster();
     addLabels();

http://git-wip-us.apache.org/repos/asf/hbase/blob/6f1dd258/hbase-it/src/test/java/org/apache/hadoop/hbase/test/IntegrationTestBigLinkedListWithVisibility.java
----------------------------------------------------------------------
diff --git 
a/hbase-it/src/test/java/org/apache/hadoop/hbase/test/IntegrationTestBigLinkedListWithVisibility.java
 
b/hbase-it/src/test/java/org/apache/hadoop/hbase/test/IntegrationTestBigLinkedListWithVisibility.java
index 8e93eaa..5f1f5b9 100644
--- 
a/hbase-it/src/test/java/org/apache/hadoop/hbase/test/IntegrationTestBigLinkedListWithVisibility.java
+++ 
b/hbase-it/src/test/java/org/apache/hadoop/hbase/test/IntegrationTestBigLinkedListWithVisibility.java
@@ -35,6 +35,7 @@ import org.apache.hadoop.hbase.HColumnDescriptor;
 import org.apache.hadoop.hbase.HRegionLocation;
 import org.apache.hadoop.hbase.HTableDescriptor;
 import org.apache.hadoop.hbase.IntegrationTestingUtility;
+import org.apache.hadoop.hbase.security.visibility.VisibilityTestUtil;
 import org.apache.hadoop.hbase.testclassification.IntegrationTests;
 import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.chaos.factories.MonkeyFactory;
@@ -369,9 +370,7 @@ public class IntegrationTestBigLinkedListWithVisibility 
extends IntegrationTestB
   public void setUpCluster() throws Exception {
     util = getTestingUtil(null);
     Configuration conf = util.getConfiguration();
-    conf.setInt(HFile.FORMAT_VERSION_KEY, 3);
-    conf.set("hbase.coprocessor.master.classes", 
VisibilityController.class.getName());
-    conf.set("hbase.coprocessor.region.classes", 
VisibilityController.class.getName());
+    VisibilityTestUtil.enableVisiblityLabels(conf);
     conf.set("hbase.superuser", User.getCurrent().getName());
     conf.setBoolean("dfs.permissions", false);
     USER = User.createUserForTesting(conf, userName, new String[] {});

http://git-wip-us.apache.org/repos/asf/hbase/blob/6f1dd258/hbase-it/src/test/java/org/apache/hadoop/hbase/test/IntegrationTestWithCellVisibilityLoadAndVerify.java
----------------------------------------------------------------------
diff --git 
a/hbase-it/src/test/java/org/apache/hadoop/hbase/test/IntegrationTestWithCellVisibilityLoadAndVerify.java
 
b/hbase-it/src/test/java/org/apache/hadoop/hbase/test/IntegrationTestWithCellVisibilityLoadAndVerify.java
index 9db5a70..f119541 100644
--- 
a/hbase-it/src/test/java/org/apache/hadoop/hbase/test/IntegrationTestWithCellVisibilityLoadAndVerify.java
+++ 
b/hbase-it/src/test/java/org/apache/hadoop/hbase/test/IntegrationTestWithCellVisibilityLoadAndVerify.java
@@ -32,6 +32,7 @@ import org.apache.hadoop.hbase.HColumnDescriptor;
 import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.HTableDescriptor;
 import org.apache.hadoop.hbase.IntegrationTestingUtility;
+import org.apache.hadoop.hbase.security.visibility.VisibilityTestUtil;
 import org.apache.hadoop.hbase.testclassification.IntegrationTests;
 import org.apache.hadoop.hbase.client.Admin;
 import org.apache.hadoop.hbase.client.HBaseAdmin;
@@ -115,9 +116,7 @@ public class IntegrationTestWithCellVisibilityLoadAndVerify 
extends IntegrationT
   public void setUpCluster() throws Exception {
     util = getTestingUtil(null);
     Configuration conf = util.getConfiguration();
-    conf.setInt(HFile.FORMAT_VERSION_KEY, 3);
-    conf.set("hbase.coprocessor.master.classes", 
VisibilityController.class.getName());
-    conf.set("hbase.coprocessor.region.classes", 
VisibilityController.class.getName());
+    VisibilityTestUtil.enableVisiblityLabels(conf);
     conf.set("hbase.superuser", User.getCurrent().getName());
     conf.setBoolean("dfs.permissions", false);
     super.setUpCluster();

http://git-wip-us.apache.org/repos/asf/hbase/blob/6f1dd258/hbase-rest/src/test/java/org/apache/hadoop/hbase/rest/TestScannersWithLabels.java
----------------------------------------------------------------------
diff --git 
a/hbase-rest/src/test/java/org/apache/hadoop/hbase/rest/TestScannersWithLabels.java
 
b/hbase-rest/src/test/java/org/apache/hadoop/hbase/rest/TestScannersWithLabels.java
index c72517c..356889a 100644
--- 
a/hbase-rest/src/test/java/org/apache/hadoop/hbase/rest/TestScannersWithLabels.java
+++ 
b/hbase-rest/src/test/java/org/apache/hadoop/hbase/rest/TestScannersWithLabels.java
@@ -37,6 +37,7 @@ import org.apache.hadoop.hbase.HBaseTestingUtility;
 import org.apache.hadoop.hbase.HColumnDescriptor;
 import org.apache.hadoop.hbase.HTableDescriptor;
 import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.security.visibility.VisibilityTestUtil;
 import org.apache.hadoop.hbase.testclassification.MediumTests;
 import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.client.Admin;
@@ -130,10 +131,8 @@ public class TestScannersWithLabels {
     conf = TEST_UTIL.getConfiguration();
     conf.setClass(VisibilityUtils.VISIBILITY_LABEL_GENERATOR_CLASS,
         SimpleScanLabelGenerator.class, ScanLabelGenerator.class);
-    conf.setInt("hfile.format.version", 3);
     conf.set("hbase.superuser", SUPERUSER.getShortName());
-    conf.set("hbase.coprocessor.master.classes", 
VisibilityController.class.getName());
-    conf.set("hbase.coprocessor.region.classes", 
VisibilityController.class.getName());
+    VisibilityTestUtil.enableVisiblityLabels(conf);
     TEST_UTIL.startMiniCluster(1);
     // Wait for the labels table to become available
     
TEST_UTIL.waitTableEnabled(VisibilityConstants.LABELS_TABLE_NAME.getName(), 
50000);

http://git-wip-us.apache.org/repos/asf/hbase/blob/6f1dd258/hbase-rsgroup/src/main/java/org/apache/hadoop/hbase/rsgroup/RSGroupAdminEndpoint.java
----------------------------------------------------------------------
diff --git 
a/hbase-rsgroup/src/main/java/org/apache/hadoop/hbase/rsgroup/RSGroupAdminEndpoint.java
 
b/hbase-rsgroup/src/main/java/org/apache/hadoop/hbase/rsgroup/RSGroupAdminEndpoint.java
index a09010e..fb9c6a6 100644
--- 
a/hbase-rsgroup/src/main/java/org/apache/hadoop/hbase/rsgroup/RSGroupAdminEndpoint.java
+++ 
b/hbase-rsgroup/src/main/java/org/apache/hadoop/hbase/rsgroup/RSGroupAdminEndpoint.java
@@ -48,6 +48,7 @@ import org.apache.hadoop.hbase.coprocessor.CoprocessorService;
 import org.apache.hadoop.hbase.coprocessor.MasterCoprocessorEnvironment;
 import org.apache.hadoop.hbase.coprocessor.MasterObserver;
 import org.apache.hadoop.hbase.coprocessor.ObserverContext;
+import org.apache.hadoop.hbase.ipc.RpcServer;
 import org.apache.hadoop.hbase.master.MasterServices;
 import org.apache.hadoop.hbase.master.RegionPlan;
 import org.apache.hadoop.hbase.master.procedure.MasterProcedureEnv;
@@ -83,6 +84,12 @@ import 
org.apache.hadoop.hbase.protobuf.generated.RSGroupAdminProtos.RemoveRSGro
 import 
org.apache.hadoop.hbase.protobuf.generated.RSGroupAdminProtos.RemoveServersRequest;
 import 
org.apache.hadoop.hbase.protobuf.generated.RSGroupAdminProtos.RemoveServersResponse;
 import org.apache.hadoop.hbase.protobuf.generated.TableProtos;
+import org.apache.hadoop.hbase.security.User;
+import org.apache.hadoop.hbase.security.UserProvider;
+import org.apache.hadoop.hbase.security.access.AccessChecker;
+import org.apache.hadoop.hbase.security.access.Permission;
+import org.apache.hadoop.hbase.security.access.TableAuthManager;
+import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
 
 public class RSGroupAdminEndpoint extends RSGroupAdminService
     implements CoprocessorService, Coprocessor, MasterObserver {
@@ -91,6 +98,10 @@ public class RSGroupAdminEndpoint extends RSGroupAdminService
 
   private static RSGroupInfoManagerImpl groupInfoManager;
   private RSGroupAdminServer groupAdminServer;
+  private AccessChecker accessChecker;
+
+  /** Provider for mapping principal names to Users */
+  private UserProvider userProvider;
 
   @Override
   public void start(CoprocessorEnvironment env) throws IOException {
@@ -103,10 +114,18 @@ public class RSGroupAdminEndpoint extends 
RSGroupAdminService
     if (!RSGroupableBalancer.class.isAssignableFrom(clazz)) {
       throw new IOException("Configured balancer is not a GroupableBalancer");
     }
+    ZooKeeperWatcher zk = menv.getMasterServices().getZooKeeper();
+    accessChecker = new AccessChecker(env.getConfiguration(), zk);
+
+    // set the user-provider.
+    this.userProvider = UserProvider.instantiate(env.getConfiguration());
   }
 
   @Override
-  public void stop(CoprocessorEnvironment env) throws IOException {
+  public void stop(CoprocessorEnvironment env) {
+    if (accessChecker.getAuthManager() != null) {
+      TableAuthManager.release(accessChecker.getAuthManager());
+    }
   }
 
   @Override
@@ -141,6 +160,7 @@ public class RSGroupAdminEndpoint extends 
RSGroupAdminService
       GetRSGroupInfoResponse.Builder builder =
           GetRSGroupInfoResponse.newBuilder();
       RSGroupInfo RSGroupInfo = 
groupAdminServer.getRSGroupInfo(request.getRSGroupName());
+      checkPermission("getRSGroupInfo");
       if(RSGroupInfo != null) {
         
builder.setRSGroupInfo(RSGroupProtobufUtil.toProtoGroupInfo(RSGroupInfo));
       }
@@ -160,6 +180,7 @@ public class RSGroupAdminEndpoint extends 
RSGroupAdminService
       GetRSGroupInfoOfTableResponse.Builder builder =
           GetRSGroupInfoOfTableResponse.newBuilder();
       TableName tableName = ProtobufUtil.toTableName(request.getTableName());
+      checkPermission("getRSGroupInfoOfTable");
       RSGroupInfo RSGroupInfo = 
groupAdminServer.getRSGroupInfoOfTable(tableName);
       if (RSGroupInfo == null) {
         response = builder.build();
@@ -184,6 +205,7 @@ public class RSGroupAdminEndpoint extends 
RSGroupAdminService
       for(HBaseProtos.ServerName el: request.getServersList()) {
         servers.add(Address.fromParts(el.getHostName(), el.getPort()));
       }
+      checkPermission("moveServers");
       groupAdminServer.moveServers(servers, request.getTargetGroup());
       response = builder.build();
     } catch (IOException e) {
@@ -204,6 +226,7 @@ public class RSGroupAdminEndpoint extends 
RSGroupAdminService
       for(TableProtos.TableName tableName: request.getTableNameList()) {
         tables.add(ProtobufUtil.toTableName(tableName));
       }
+      checkPermission("moveTables");
       groupAdminServer.moveTables(tables, request.getTargetGroup());
       response = builder.build();
     } catch (IOException e) {
@@ -225,6 +248,7 @@ public class RSGroupAdminEndpoint extends 
RSGroupAdminService
       for (TableProtos.TableName tableName : request.getTableNameList()) {
         tables.add(ProtobufUtil.toTableName(tableName));
       }
+      checkPermission("moveServersAndTables");
       groupAdminServer.moveServersAndTables(servers, tables, 
request.getTargetGroup());
     } catch (IOException e) {
       ResponseConverter.setControllerException(controller, e);
@@ -240,6 +264,7 @@ public class RSGroupAdminEndpoint extends 
RSGroupAdminService
     try {
       AddRSGroupResponse.Builder builder =
           AddRSGroupResponse.newBuilder();
+      checkPermission("addRSGroup");
       groupAdminServer.addRSGroup(request.getRSGroupName());
       response = builder.build();
     } catch (IOException e) {
@@ -256,6 +281,7 @@ public class RSGroupAdminEndpoint extends 
RSGroupAdminService
     try {
       RemoveRSGroupResponse.Builder builder =
           RemoveRSGroupResponse.newBuilder();
+      checkPermission("removeRSGroup");
       groupAdminServer.removeRSGroup(request.getRSGroupName());
       response = builder.build();
     } catch (IOException e) {
@@ -270,6 +296,7 @@ public class RSGroupAdminEndpoint extends 
RSGroupAdminService
                            RpcCallback<BalanceRSGroupResponse> done) {
     BalanceRSGroupResponse.Builder builder = 
BalanceRSGroupResponse.newBuilder();
     try {
+      checkPermission("balanceRSGroup");
       
builder.setBalanceRan(groupAdminServer.balanceRSGroup(request.getRSGroupName()));
     } catch (IOException e) {
       ResponseConverter.setControllerException(controller, e);
@@ -286,6 +313,7 @@ public class RSGroupAdminEndpoint extends 
RSGroupAdminService
     try {
       ListRSGroupInfosResponse.Builder builder =
           ListRSGroupInfosResponse.newBuilder();
+      checkPermission("listRSGroupInfos");
       for(RSGroupInfo RSGroupInfo : groupAdminServer.listRSGroups()) {
         
builder.addRSGroupInfo(RSGroupProtobufUtil.toProtoGroupInfo(RSGroupInfo));
       }
@@ -304,6 +332,7 @@ public class RSGroupAdminEndpoint extends 
RSGroupAdminService
     try {
       Address server =
           Address.fromParts(request.getServer().getHostName(), 
request.getServer().getPort());
+      checkPermission("getRSGroupInfoOfServer");
       RSGroupInfo RSGroupInfo = groupAdminServer.getRSGroupOfServer(server);
       if (RSGroupInfo != null) {
         
builder.setRSGroupInfo(RSGroupProtobufUtil.toProtoGroupInfo(RSGroupInfo));
@@ -325,6 +354,7 @@ public class RSGroupAdminEndpoint extends 
RSGroupAdminService
       for (HBaseProtos.ServerName el : request.getServersList()) {
         servers.add(Address.fromParts(el.getHostName(), el.getPort()));
       }
+      checkPermission("removeServers");
       groupAdminServer.removeServers(servers);
     } catch (IOException e) {
       ResponseConverter.setControllerException(controller, e);
@@ -1080,4 +1110,22 @@ public class RSGroupAdminEndpoint extends 
RSGroupAdminService
                                  String groupName, boolean balancerRan) throws 
IOException {
 
   }
+
+  public void checkPermission(String request) throws IOException {
+    accessChecker.requirePermission(getActiveUser(), request, 
Permission.Action.ADMIN);
+  }
+
+  /**
+   * Returns the active user to which authorization checks should be applied.
+   * If we are in the context of an RPC call, the remote user is used,
+   * otherwise the currently logged in user is used.
+   */
+  private User getActiveUser() throws IOException {
+    User user = RpcServer.getRequestUser();
+    if (user == null) {
+      // for non-rpc handling, fallback to system user
+      user = userProvider.getCurrent();
+    }
+    return user;
+  }
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/6f1dd258/hbase-rsgroup/src/test/java/org/apache/hadoop/hbase/rsgroup/TestRSGroupsWithACL.java
----------------------------------------------------------------------
diff --git 
a/hbase-rsgroup/src/test/java/org/apache/hadoop/hbase/rsgroup/TestRSGroupsWithACL.java
 
b/hbase-rsgroup/src/test/java/org/apache/hadoop/hbase/rsgroup/TestRSGroupsWithACL.java
new file mode 100644
index 0000000..3b15f1b
--- /dev/null
+++ 
b/hbase-rsgroup/src/test/java/org/apache/hadoop/hbase/rsgroup/TestRSGroupsWithACL.java
@@ -0,0 +1,358 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hadoop.hbase.rsgroup;
+
+import static org.apache.hadoop.hbase.AuthUtil.toGroupEntry;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.HColumnDescriptor;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.HTableDescriptor;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.TableNotFoundException;
+import org.apache.hadoop.hbase.client.Connection;
+import org.apache.hadoop.hbase.coprocessor.CoprocessorHost;
+import org.apache.hadoop.hbase.security.User;
+import org.apache.hadoop.hbase.security.access.AccessControlClient;
+import org.apache.hadoop.hbase.security.access.AccessControlLists;
+import org.apache.hadoop.hbase.security.access.Permission;
+import org.apache.hadoop.hbase.security.access.SecureTestUtil;
+import org.apache.hadoop.hbase.security.access.TableAuthManager;
+import org.apache.hadoop.hbase.testclassification.SecurityTests;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Performs authorization checks for rsgroup operations, according to different
+ * levels of authorized users.
+ */
+@Category({SecurityTests.class})
+public class TestRSGroupsWithACL extends SecureTestUtil{
+  private static final Logger LOG = 
LoggerFactory.getLogger(TestRSGroupsWithACL.class);
+  private static TableName TEST_TABLE = TableName.valueOf("testtable1");
+  private static final HBaseTestingUtility TEST_UTIL = new 
HBaseTestingUtility();
+  private static Configuration conf;
+
+  private static Connection systemUserConnection;
+  // user with all permissions
+  private static User SUPERUSER;
+  // user granted with all global permission
+  private static User USER_ADMIN;
+  // user with rw permissions on column family.
+  private static User USER_RW;
+  // user with read-only permissions
+  private static User USER_RO;
+  // user is table owner. will have all permissions on table
+  private static User USER_OWNER;
+  // user with create table permissions alone
+  private static User USER_CREATE;
+  // user with no permissions
+  private static User USER_NONE;
+
+  private static final String GROUP_ADMIN = "group_admin";
+  private static final String GROUP_CREATE = "group_create";
+  private static final String GROUP_READ = "group_read";
+  private static final String GROUP_WRITE = "group_write";
+
+  private static User USER_GROUP_ADMIN;
+  private static User USER_GROUP_CREATE;
+  private static User USER_GROUP_READ;
+  private static User USER_GROUP_WRITE;
+
+  private static byte[] TEST_FAMILY = Bytes.toBytes("f1");
+
+  private static RSGroupAdminEndpoint rsGroupAdminEndpoint;
+
+  @BeforeClass
+  public static void setupBeforeClass() throws Exception {
+    // setup configuration
+    conf = TEST_UTIL.getConfiguration();
+    conf.set(HConstants.HBASE_MASTER_LOADBALANCER_CLASS,
+        RSGroupBasedLoadBalancer.class.getName());
+    // Enable security
+    enableSecurity(conf);
+    // Verify enableSecurity sets up what we require
+    verifyConfiguration(conf);
+    // Enable rsgroup
+    configureRSGroupAdminEndpoint(conf);
+
+    TEST_UTIL.startMiniCluster();
+    rsGroupAdminEndpoint = (RSGroupAdminEndpoint) 
TEST_UTIL.getMiniHBaseCluster().getMaster().
+        
getMasterCoprocessorHost().findCoprocessor(RSGroupAdminEndpoint.class.getName());
+    // Wait for the ACL table to become available
+    TEST_UTIL.waitUntilAllRegionsAssigned(AccessControlLists.ACL_TABLE_NAME);
+
+    // create a set of test users
+    SUPERUSER = User.createUserForTesting(conf, "admin", new String[] { 
"supergroup" });
+    USER_ADMIN = User.createUserForTesting(conf, "admin2", new String[0]);
+    USER_RW = User.createUserForTesting(conf, "rwuser", new String[0]);
+    USER_RO = User.createUserForTesting(conf, "rouser", new String[0]);
+    USER_OWNER = User.createUserForTesting(conf, "owner", new String[0]);
+    USER_CREATE = User.createUserForTesting(conf, "tbl_create", new String[0]);
+    USER_NONE = User.createUserForTesting(conf, "nouser", new String[0]);
+
+    USER_GROUP_ADMIN =
+        User.createUserForTesting(conf, "user_group_admin", new String[] { 
GROUP_ADMIN });
+    USER_GROUP_CREATE =
+        User.createUserForTesting(conf, "user_group_create", new String[] { 
GROUP_CREATE });
+    USER_GROUP_READ =
+        User.createUserForTesting(conf, "user_group_read", new String[] { 
GROUP_READ });
+    USER_GROUP_WRITE =
+        User.createUserForTesting(conf, "user_group_write", new String[] { 
GROUP_WRITE });
+
+    systemUserConnection = TEST_UTIL.getConnection();
+    setUpTableAndUserPermissions();
+  }
+
+  private static void setUpTableAndUserPermissions() throws Exception {
+    HTableDescriptor htd = new HTableDescriptor(TEST_TABLE);
+    HColumnDescriptor hcd = new HColumnDescriptor(TEST_FAMILY);
+    hcd.setMaxVersions(100);
+    htd.addFamily(hcd);
+    htd.setOwner(USER_OWNER);
+    createTable(TEST_UTIL, htd, new byte[][] { Bytes.toBytes("s") });
+
+    // Set up initial grants
+    grantGlobal(TEST_UTIL, USER_ADMIN.getShortName(),
+        Permission.Action.ADMIN,
+        Permission.Action.CREATE,
+        Permission.Action.READ,
+        Permission.Action.WRITE);
+
+    grantOnTable(TEST_UTIL, USER_RW.getShortName(),
+        TEST_TABLE, TEST_FAMILY, null,
+        Permission.Action.READ,
+        Permission.Action.WRITE);
+
+    // USER_CREATE is USER_RW plus CREATE permissions
+    grantOnTable(TEST_UTIL, USER_CREATE.getShortName(),
+        TEST_TABLE, null, null,
+        Permission.Action.CREATE,
+        Permission.Action.READ,
+        Permission.Action.WRITE);
+
+    grantOnTable(TEST_UTIL, USER_RO.getShortName(),
+        TEST_TABLE, TEST_FAMILY, null,
+        Permission.Action.READ);
+
+    grantGlobal(TEST_UTIL, toGroupEntry(GROUP_ADMIN), Permission.Action.ADMIN);
+    grantGlobal(TEST_UTIL, toGroupEntry(GROUP_CREATE), 
Permission.Action.CREATE);
+    grantGlobal(TEST_UTIL, toGroupEntry(GROUP_READ), Permission.Action.READ);
+    grantGlobal(TEST_UTIL, toGroupEntry(GROUP_WRITE), Permission.Action.WRITE);
+
+    assertEquals(4, AccessControlLists.getTablePermissions(conf, 
TEST_TABLE).size());
+    try {
+      assertEquals(4, 
AccessControlClient.getUserPermissions(systemUserConnection,
+          TEST_TABLE.toString()).size());
+    } catch (Throwable e) {
+      LOG.error("error during call of AccessControlClient.getUserPermissions. 
", e);
+    }
+  }
+
+  private static void cleanUp() throws Exception {
+    // Clean the _acl_ table
+    try {
+      deleteTable(TEST_UTIL, TEST_TABLE);
+    } catch (TableNotFoundException ex) {
+      // Test deleted the table, no problem
+      LOG.info("Test deleted table " + TEST_TABLE);
+    }
+    // Verify all table/namespace permissions are erased
+    assertEquals(0, AccessControlLists.getTablePermissions(conf, 
TEST_TABLE).size());
+    assertEquals(0, AccessControlLists.getNamespacePermissions(conf,
+            TEST_TABLE.getNamespaceAsString()).size());
+  }
+
+  @AfterClass
+  public static void tearDownAfterClass() throws Exception {
+    cleanUp();
+    TEST_UTIL.shutdownMiniCluster();
+    int total = TableAuthManager.getTotalRefCount();
+    assertTrue("Unexpected reference count: " + total, total == 0);
+  }
+
+  private static void configureRSGroupAdminEndpoint(Configuration conf) {
+    String currentCoprocessors = 
conf.get(CoprocessorHost.MASTER_COPROCESSOR_CONF_KEY);
+    String coprocessors = RSGroupAdminEndpoint.class.getName();
+    if (currentCoprocessors != null) {
+      coprocessors += "," + currentCoprocessors;
+    }
+    conf.set(CoprocessorHost.MASTER_COPROCESSOR_CONF_KEY, coprocessors);
+    conf.set(HConstants.HBASE_MASTER_LOADBALANCER_CLASS,
+        RSGroupBasedLoadBalancer.class.getName());
+  }
+
+  @Test
+  public void testGetRSGroupInfo() throws Exception {
+    AccessTestAction action = new AccessTestAction() {
+      @Override
+      public Object run() throws Exception {
+        rsGroupAdminEndpoint.checkPermission("getRSGroupInfo");
+        return null;
+      }
+    };
+
+    verifyAllowed(action, SUPERUSER, USER_ADMIN, USER_GROUP_ADMIN);
+    verifyDenied(action, USER_CREATE, USER_OWNER, USER_RW, USER_RO,
+        USER_NONE, USER_GROUP_READ, USER_GROUP_WRITE, USER_GROUP_CREATE);
+  }
+
+  @Test
+  public void testGetRSGroupInfoOfTable() throws Exception {
+    AccessTestAction action = new AccessTestAction() {
+      @Override
+      public Object run() throws Exception {
+        rsGroupAdminEndpoint.checkPermission("getRSGroupInfoOfTable");
+        return null;
+      }
+    };
+
+    verifyAllowed(action, SUPERUSER, USER_ADMIN, USER_GROUP_ADMIN);
+    verifyDenied(action, USER_CREATE, USER_OWNER, USER_RW, USER_RO,
+        USER_NONE, USER_GROUP_READ, USER_GROUP_WRITE, USER_GROUP_CREATE);
+  }
+
+  @Test
+  public void testMoveServers() throws Exception {
+    AccessTestAction action = new AccessTestAction() {
+      @Override
+      public Object run() throws Exception {
+        rsGroupAdminEndpoint.checkPermission("moveServers");
+        return null;
+      }
+    };
+
+    verifyAllowed(action, SUPERUSER, USER_ADMIN, USER_GROUP_ADMIN);
+    verifyDenied(action, USER_CREATE, USER_OWNER, USER_RW, USER_RO,
+        USER_NONE, USER_GROUP_READ, USER_GROUP_WRITE, USER_GROUP_CREATE);
+  }
+
+  @Test
+  public void testMoveTables() throws Exception {
+    AccessTestAction action = new AccessTestAction() {
+      @Override
+      public Object run() throws Exception {
+        rsGroupAdminEndpoint.checkPermission("moveTables");
+        return null;
+      }
+    };
+
+    verifyAllowed(action, SUPERUSER, USER_ADMIN, USER_GROUP_ADMIN);
+    verifyDenied(action, USER_CREATE, USER_OWNER, USER_RW, USER_RO,
+        USER_NONE, USER_GROUP_READ, USER_GROUP_WRITE, USER_GROUP_CREATE);
+  }
+
+  @Test
+  public void testAddRSGroup() throws Exception {
+    AccessTestAction action = new AccessTestAction() {
+      @Override
+      public Object run() throws Exception {
+        rsGroupAdminEndpoint.checkPermission("addRSGroup");
+        return null;
+      }
+    };
+
+    verifyAllowed(action, SUPERUSER, USER_ADMIN, USER_GROUP_ADMIN);
+    verifyDenied(action, USER_CREATE, USER_OWNER, USER_RW, USER_RO,
+        USER_NONE, USER_GROUP_READ, USER_GROUP_WRITE, USER_GROUP_CREATE);
+  }
+
+  @Test
+  public void testRemoveRSGroup() throws Exception {
+    AccessTestAction action = new AccessTestAction() {
+      @Override
+      public Object run() throws Exception {
+        rsGroupAdminEndpoint.checkPermission("removeRSGroup");
+        return null;
+      }
+    };
+
+    verifyAllowed(action, SUPERUSER, USER_ADMIN, USER_GROUP_ADMIN);
+    verifyDenied(action, USER_CREATE, USER_OWNER, USER_RW, USER_RO,
+        USER_NONE, USER_GROUP_READ, USER_GROUP_WRITE, USER_GROUP_CREATE);
+  }
+
+  @Test
+  public void testBalanceRSGroup() throws Exception {
+    AccessTestAction action = new AccessTestAction() {
+      @Override
+      public Object run() throws Exception {
+        rsGroupAdminEndpoint.checkPermission("balanceRSGroup");
+        return null;
+      }
+    };
+
+    verifyAllowed(action, SUPERUSER, USER_ADMIN, USER_GROUP_ADMIN);
+    verifyDenied(action, USER_CREATE, USER_OWNER, USER_RW, USER_RO,
+        USER_NONE, USER_GROUP_READ, USER_GROUP_WRITE, USER_GROUP_CREATE);
+  }
+
+  @Test
+  public void testListRSGroup() throws Exception {
+    AccessTestAction action = new AccessTestAction() {
+      @Override
+      public Object run() throws Exception {
+        rsGroupAdminEndpoint.checkPermission("listRSGroup");
+        return null;
+      }
+    };
+
+    verifyAllowed(action, SUPERUSER, USER_ADMIN, USER_GROUP_ADMIN);
+    verifyDenied(action, USER_CREATE, USER_OWNER, USER_RW, USER_RO,
+        USER_NONE, USER_GROUP_READ, USER_GROUP_WRITE, USER_GROUP_CREATE);
+  }
+
+  @Test
+  public void testGetRSGroupInfoOfServer() throws Exception {
+    AccessTestAction action = new AccessTestAction() {
+      @Override
+      public Object run() throws Exception {
+        rsGroupAdminEndpoint.checkPermission("getRSGroupInfoOfServer");
+        return null;
+      }
+    };
+
+    verifyAllowed(action, SUPERUSER, USER_ADMIN, USER_GROUP_ADMIN);
+    verifyDenied(action, USER_CREATE, USER_OWNER, USER_RW, USER_RO,
+        USER_NONE, USER_GROUP_READ, USER_GROUP_WRITE, USER_GROUP_CREATE);
+  }
+
+  @Test
+  public void testMoveServersAndTables() throws Exception {
+    AccessTestAction action = new AccessTestAction() {
+      @Override
+      public Object run() throws Exception {
+        rsGroupAdminEndpoint.checkPermission("moveServersAndTables");
+        return null;
+      }
+    };
+
+    verifyAllowed(action, SUPERUSER, USER_ADMIN, USER_GROUP_ADMIN);
+    verifyDenied(action, USER_CREATE, USER_OWNER, USER_RW, USER_RO,
+        USER_NONE, USER_GROUP_READ, USER_GROUP_WRITE, USER_GROUP_CREATE);
+  }
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/6f1dd258/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterRpcServices.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterRpcServices.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterRpcServices.java
index b86a9f5..08a2033 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterRpcServices.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterRpcServices.java
@@ -190,6 +190,7 @@ import 
org.apache.hadoop.hbase.protobuf.generated.RegionServerStatusProtos.Repor
 import 
org.apache.hadoop.hbase.protobuf.generated.RegionServerStatusProtos.ReportRegionStateTransitionResponse;
 import org.apache.hadoop.hbase.regionserver.RSRpcServices;
 import org.apache.hadoop.hbase.security.User;
+import org.apache.hadoop.hbase.security.access.AccessChecker;
 import org.apache.hadoop.hbase.security.access.AccessController;
 import org.apache.hadoop.hbase.security.visibility.VisibilityController;
 import org.apache.hadoop.hbase.snapshot.ClientSnapshotDescriptionUtils;
@@ -1669,7 +1670,7 @@ public class MasterRpcServices extends RSRpcServices
       // The AccessController can provide AUTHORIZATION and CELL_AUTHORIZATION
       if (master.cpHost != null &&
             master.cpHost.findCoprocessor(AccessController.class.getName()) != 
null) {
-        if 
(AccessController.isAuthorizationSupported(master.getConfiguration())) {
+        if (AccessChecker.isAuthorizationSupported(master.getConfiguration())) 
{
           capabilities.add(Capability.AUTHORIZATION);
         }
         if 
(AccessController.isCellAuthorizationSupported(master.getConfiguration())) {

http://git-wip-us.apache.org/repos/asf/hbase/blob/6f1dd258/hbase-server/src/main/java/org/apache/hadoop/hbase/security/access/AccessChecker.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/security/access/AccessChecker.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/security/access/AccessChecker.java
new file mode 100644
index 0000000..6afc96d
--- /dev/null
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/security/access/AccessChecker.java
@@ -0,0 +1,300 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hadoop.hbase.security.access;
+
+import java.io.IOException;
+import java.net.InetAddress;
+import java.util.Collection;
+import java.util.Map;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.ipc.RpcServer;
+import org.apache.hadoop.hbase.security.AccessDeniedException;
+import org.apache.hadoop.hbase.security.User;
+import org.apache.hadoop.hbase.security.access.Permission.Action;
+import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+@InterfaceAudience.Private
+public final class AccessChecker {
+  private static final Logger AUDITLOG =
+      LoggerFactory.getLogger("SecurityLogger." + 
AccessChecker.class.getName());
+  private TableAuthManager authManager;
+  /**
+   * if we are active, usually false, only true if 
"hbase.security.authorization"
+   * has been set to true in site configuration.see HBASE-19483.
+   */
+  private boolean authorizationEnabled;
+
+  public static boolean isAuthorizationSupported(Configuration conf) {
+    return conf.getBoolean(User.HBASE_SECURITY_AUTHORIZATION_CONF_KEY, false);
+  }
+
+  /**
+   * Constructor with existing configuration
+   *
+   * @param conf Existing configuration to use
+   * @param zkw reference to the {@link ZooKeeperWatcher}
+   */
+  public AccessChecker(final Configuration conf, final ZooKeeperWatcher zkw)
+      throws RuntimeException {
+    // If zk is null or IOException while obtaining auth manager,
+    // throw RuntimeException so that the coprocessor is unloaded.
+    if (zkw != null) {
+      try {
+        this.authManager = TableAuthManager.getOrCreate(zkw, conf);
+      } catch (IOException ioe) {
+        throw new RuntimeException("Error obtaining AccessChecker", ioe);
+      }
+    } else {
+      throw new NullPointerException("Error obtaining AccessChecker, zk found 
null.");
+    }
+    authorizationEnabled = isAuthorizationSupported(conf);
+  }
+
+  public TableAuthManager getAuthManager() {
+    return authManager;
+  }
+
+  public void logResult(AuthResult result) {
+    if (AUDITLOG.isTraceEnabled()) {
+      InetAddress remoteAddr = RpcServer.getRemoteAddress();
+      AUDITLOG.trace("Access " + (result.isAllowed() ? "allowed" : "denied") +
+          " for user " + (result.getUser() != null ? 
result.getUser().getShortName() : "UNKNOWN") +
+          "; reason: " + result.getReason() +
+          "; remote address: " + (remoteAddr != null ? remoteAddr : "") +
+          "; request: " + result.getRequest() +
+          "; context: " + result.toContextString());
+    }
+  }
+
+  /**
+   * Authorizes that the current user has any of the given permissions for the
+   * given table, column family and column qualifier.
+   * @param tableName Table requested
+   * @param family Column family requested
+   * @param qualifier Column qualifier requested
+   * @throws IOException if obtaining the current user fails
+   * @throws AccessDeniedException if user has no authorization
+   */
+  public void requirePermission(User user, String request, TableName 
tableName, byte[] family,
+      byte[] qualifier, Action... permissions) throws IOException {
+    AuthResult result = null;
+
+    for (Action permission : permissions) {
+      if (authManager.authorize(user, tableName, family, qualifier, 
permission)) {
+        result = AuthResult.allow(request, "Table permission granted", user,
+            permission, tableName, family, qualifier);
+        break;
+      } else {
+        // rest of the world
+        result = AuthResult.deny(request, "Insufficient permissions", user,
+            permission, tableName, family, qualifier);
+      }
+    }
+    logResult(result);
+    if (authorizationEnabled && !result.isAllowed()) {
+      throw new AccessDeniedException("Insufficient permissions " + 
result.toContextString());
+    }
+  }
+
+  /**
+   * Authorizes that the current user has any of the given permissions for the
+   * given table, column family and column qualifier.
+   * @param tableName Table requested
+   * @param family Column family param
+   * @param qualifier Column qualifier param
+   * @throws IOException if obtaining the current user fails
+   * @throws AccessDeniedException if user has no authorization
+   */
+  public void requireTablePermission(User user, String request, TableName 
tableName, byte[] family,
+      byte[] qualifier, Action... permissions) throws IOException {
+    AuthResult result = null;
+
+    for (Action permission : permissions) {
+      if (authManager.authorize(user, tableName, null, null, permission)) {
+        result = AuthResult.allow(request, "Table permission granted", user,
+            permission, tableName, null, null);
+        result.getParams().setFamily(family).setQualifier(qualifier);
+        break;
+      } else {
+        // rest of the world
+        result = AuthResult.deny(request, "Insufficient permissions", user,
+            permission, tableName, family, qualifier);
+        result.getParams().setFamily(family).setQualifier(qualifier);
+      }
+    }
+    logResult(result);
+    if (authorizationEnabled && !result.isAllowed()) {
+      throw new AccessDeniedException("Insufficient permissions " + 
result.toContextString());
+    }
+  }
+
+  /**
+   * Authorizes that the current user has any of the given permissions to 
access the table.
+   *
+   * @param tableName Table requested
+   * @param permissions Actions being requested
+   * @throws IOException if obtaining the current user fails
+   * @throws AccessDeniedException if user has no authorization
+   */
+  public void requireAccess(User user, String request, TableName tableName,
+      Action... permissions) throws IOException {
+    AuthResult result = null;
+
+    for (Action permission : permissions) {
+      if (authManager.hasAccess(user, tableName, permission)) {
+        result = AuthResult.allow(request, "Table permission granted", user,
+            permission, tableName, null, null);
+        break;
+      } else {
+        // rest of the world
+        result = AuthResult.deny(request, "Insufficient permissions", user,
+            permission, tableName, null, null);
+      }
+    }
+    logResult(result);
+    if (authorizationEnabled && !result.isAllowed()) {
+      throw new AccessDeniedException("Insufficient permissions " + 
result.toContextString());
+    }
+  }
+
+  /**
+   * Authorizes that the current user has global privileges for the given 
action.
+   * @param perm The action being requested
+   * @throws IOException if obtaining the current user fails
+   * @throws AccessDeniedException if authorization is denied
+   */
+  public void requirePermission(User user, String request, Action perm) throws 
IOException {
+    requireGlobalPermission(user, request, perm, null, null);
+  }
+
+  /**
+   * Checks that the user has the given global permission. The generated
+   * audit log message will contain context information for the operation
+   * being authorized, based on the given parameters.
+   * @param perm Action being requested
+   * @param tableName Affected table name.
+   * @param familyMap Affected column families.
+   */
+  public void requireGlobalPermission(User user, String request, Action perm, 
TableName tableName,
+      Map<byte[], ? extends Collection<byte[]>> familyMap) throws IOException {
+    AuthResult result = null;
+    if (authManager.authorize(user, perm)) {
+      result = AuthResult.allow(request, "Global check allowed", user, perm, 
tableName, familyMap);
+      result.getParams().setTableName(tableName).setFamilies(familyMap);
+      logResult(result);
+    } else {
+      result = AuthResult.deny(request, "Global check failed", user, perm, 
tableName, familyMap);
+      result.getParams().setTableName(tableName).setFamilies(familyMap);
+      logResult(result);
+      if (authorizationEnabled) {
+        throw new AccessDeniedException("Insufficient permissions for user '" +
+            (user != null ? user.getShortName() : "null") +"' (global, 
action=" +
+            perm.toString() + ")");
+      }
+    }
+  }
+
+  /**
+   * Checks that the user has the given global permission. The generated
+   * audit log message will contain context information for the operation
+   * being authorized, based on the given parameters.
+   * @param perm Action being requested
+   * @param namespace  The given namespace
+   */
+  public void requireGlobalPermission(User user, String request, Action perm,
+      String namespace) throws IOException {
+    AuthResult authResult = null;
+    if (authManager.authorize(user, perm)) {
+      authResult = AuthResult.allow(request, "Global check allowed", user, 
perm, null);
+      authResult.getParams().setNamespace(namespace);
+      logResult(authResult);
+    } else {
+      authResult = AuthResult.deny(request, "Global check failed", user, perm, 
null);
+      authResult.getParams().setNamespace(namespace);
+      logResult(authResult);
+      if (authorizationEnabled) {
+        throw new AccessDeniedException("Insufficient permissions for user '" +
+            (user != null ? user.getShortName() : "null") +"' (global, 
action=" +
+            perm.toString() + ")");
+      }
+    }
+  }
+
+  /**
+   * Checks that the user has the given global or namespace permission.
+   * @param namespace  The given namespace
+   * @param permissions Actions being requested
+   */
+  public void requireNamespacePermission(User user, String request, String 
namespace,
+      Action... permissions) throws IOException {
+    AuthResult result = null;
+
+    for (Action permission : permissions) {
+      if (authManager.authorize(user, namespace, permission)) {
+        result = AuthResult.allow(request, "Namespace permission granted",
+            user, permission, namespace);
+        break;
+      } else {
+        // rest of the world
+        result = AuthResult.deny(request, "Insufficient permissions", user,
+            permission, namespace);
+      }
+    }
+    logResult(result);
+    if (authorizationEnabled && !result.isAllowed()) {
+      throw new AccessDeniedException("Insufficient permissions "
+          + result.toContextString());
+    }
+  }
+
+  /**
+   * Checks that the user has the given global or namespace permission.
+   * @param namespace   The given namespace
+   * @param permissions Actions being requested
+   */
+  public void requireNamespacePermission(User user, String request, String 
namespace, TableName tableName,
+      Map<byte[], ? extends Collection<byte[]>> familyMap, Action... 
permissions)
+      throws IOException {
+    AuthResult result = null;
+
+    for (Action permission : permissions) {
+      if (authManager.authorize(user, namespace, permission)) {
+        result = AuthResult.allow(request, "Namespace permission granted",
+            user, permission, namespace);
+        result.getParams().setTableName(tableName).setFamilies(familyMap);
+        break;
+      } else {
+        // rest of the world
+        result = AuthResult.deny(request, "Insufficient permissions", user,
+            permission, namespace);
+        result.getParams().setTableName(tableName).setFamilies(familyMap);
+      }
+    }
+    logResult(result);
+    if (authorizationEnabled && !result.isAllowed()) {
+      throw new AccessDeniedException("Insufficient permissions "
+          + result.toContextString());
+    }
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hbase/blob/6f1dd258/hbase-server/src/main/java/org/apache/hadoop/hbase/security/access/AccessControlLists.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/security/access/AccessControlLists.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/security/access/AccessControlLists.java
index a2ac927..57c0f7b 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/security/access/AccessControlLists.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/security/access/AccessControlLists.java
@@ -31,6 +31,7 @@ import java.util.Set;
 import java.util.TreeMap;
 import java.util.TreeSet;
 
+import com.google.common.annotations.VisibleForTesting;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
@@ -510,7 +511,8 @@ public class AccessControlLists {
     return getPermissions(conf, tableName != null ? tableName.getName() : 
null, null);
   }
 
-  static ListMultimap<String, TablePermission> 
getNamespacePermissions(Configuration conf,
+  @VisibleForTesting
+  public static ListMultimap<String, TablePermission> 
getNamespacePermissions(Configuration conf,
         String namespace) throws IOException {
     return getPermissions(conf, Bytes.toBytes(toNamespaceEntry(namespace)), 
null);
   }

Reply via email to