HBASE-14425 In Secure Zookeeper cluster superuser will not have sufficient 
permission if multiple values are configured in hbase.superuser (Pankaj Kumar)

Conflicts:
        
hbase-client/src/main/java/org/apache/hadoop/hbase/zookeeper/ZooKeeperWatcher.java


Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/c174a54d
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/c174a54d
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/c174a54d

Branch: refs/heads/branch-1
Commit: c174a54d87e14edd8c7cf039fecd4ce40066521b
Parents: d7c1468
Author: Enis Soztutar <e...@apache.org>
Authored: Tue Oct 27 16:56:20 2015 -0700
Committer: Enis Soztutar <e...@apache.org>
Committed: Tue Oct 27 17:00:46 2015 -0700

----------------------------------------------------------------------
 .../apache/hadoop/hbase/zookeeper/ZKUtil.java   | 20 +++++++--
 .../hbase/zookeeper/ZooKeeperWatcher.java       | 46 +++++++++++++++++++-
 .../hadoop/hbase/zookeeper/TestZKUtil.java      | 24 +++++++++-
 .../test/IntegrationTestZKAndFSPermissions.java |  3 +-
 4 files changed, 86 insertions(+), 7 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hbase/blob/c174a54d/hbase-client/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKUtil.java
----------------------------------------------------------------------
diff --git 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKUtil.java 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKUtil.java
index c4c9819..ffbe2db 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKUtil.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKUtil.java
@@ -39,6 +39,7 @@ import org.apache.commons.lang.StringUtils;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.AuthUtil;
 import org.apache.hadoop.hbase.HBaseConfiguration;
 import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.ServerName;
@@ -48,6 +49,7 @@ import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
 import org.apache.hadoop.hbase.protobuf.generated.ClusterStatusProtos;
 import 
org.apache.hadoop.hbase.protobuf.generated.ClusterStatusProtos.RegionStoreSequenceIds;
 import org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos;
+import org.apache.hadoop.hbase.security.Superusers;
 import org.apache.hadoop.hbase.util.ByteStringer;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.util.Threads;
@@ -1030,11 +1032,23 @@ public class ZKUtil {
       return Ids.OPEN_ACL_UNSAFE;
     }
     if (isSecureZooKeeper) {
-      String superUser = zkw.getConfiguration().get("hbase.superuser");
       ArrayList<ACL> acls = new ArrayList<ACL>();
       // add permission to hbase supper user
-      if (superUser != null) {
-        acls.add(new ACL(Perms.ALL, new Id("auth", superUser)));
+      String[] superUsers = 
zkw.getConfiguration().getStrings(Superusers.SUPERUSER_CONF_KEY);
+      if (superUsers != null) {
+        List<String> groups = new ArrayList<String>();
+        for (String user : superUsers) {
+          if (user.startsWith(AuthUtil.GROUP_PREFIX)) {
+            // TODO: Set node ACL for groups when ZK supports this feature
+            groups.add(user);
+          } else {
+            acls.add(new ACL(Perms.ALL, new Id("auth", user)));
+          }
+        }
+        if (!groups.isEmpty()) {
+          LOG.warn("Znode ACL setting for group " + groups
+              + " is skipped, Zookeeper doesn't support this feature 
presently.");
+        }
       }
       // Certain znodes are accessed directly by the client,
       // so they must be readable by non-authenticated clients

http://git-wip-us.apache.org/repos/asf/hbase/blob/c174a54d/hbase-client/src/main/java/org/apache/hadoop/hbase/zookeeper/ZooKeeperWatcher.java
----------------------------------------------------------------------
diff --git 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/zookeeper/ZooKeeperWatcher.java
 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/zookeeper/ZooKeeperWatcher.java
index 27b2da3..9401a62 100644
--- 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/zookeeper/ZooKeeperWatcher.java
+++ 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/zookeeper/ZooKeeperWatcher.java
@@ -32,9 +32,12 @@ import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.hbase.classification.InterfaceAudience;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.Abortable;
+import org.apache.hadoop.hbase.AuthUtil;
 import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.HRegionInfo;
 import org.apache.hadoop.hbase.ZooKeeperConnectionException;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.security.Superusers;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.zookeeper.KeeperException;
 import org.apache.zookeeper.WatchedEvent;
@@ -269,7 +272,11 @@ public class ZooKeeperWatcher implements Watcher, 
Abortable, Closeable {
    * @throws IOException
    */
   private boolean isBaseZnodeAclSetup(List<ACL> acls) throws IOException {
-    String superUser = conf.get("hbase.superuser");
+    String[] superUsers = conf.getStrings(Superusers.SUPERUSER_CONF_KEY);
+    // Check whether ACL set for all superusers
+    if (superUsers != null && !checkACLForSuperUsers(superUsers, acls)) {
+      return false;
+    }
 
     // this assumes that current authenticated user is the same as zookeeper 
client user
     // configured via JAAS
@@ -288,7 +295,7 @@ public class ZooKeeperWatcher implements Watcher, 
Abortable, Closeable {
         if (perms != Perms.READ) {
           return false;
         }
-      } else if (superUser != null && new Id("sasl", superUser).equals(id)) {
+      } else if (superUsers != null && isSuperUserId(superUsers, id)) {
         if (perms != Perms.ALL) {
           return false;
         }
@@ -302,6 +309,41 @@ public class ZooKeeperWatcher implements Watcher, 
Abortable, Closeable {
     }
     return true;
   }
+  
+  /*
+   * Validate whether ACL set for all superusers.
+   */
+  private boolean checkACLForSuperUsers(String[] superUsers, List<ACL> acls) {
+    for (String user : superUsers) {
+      boolean hasAccess = false;
+      // TODO: Validate super group members also when ZK supports setting node 
ACL for groups.
+      if (!user.startsWith(AuthUtil.GROUP_PREFIX)) {
+        for (ACL acl : acls) {
+          if (user.equals(acl.getId().getId()) && acl.getPerms() == Perms.ALL) 
{
+            hasAccess = true;
+            break;
+          }
+        }
+        if (!hasAccess) {
+          return false;
+        }
+      }
+    }
+    return true;
+  }
+  
+  /*
+   * Validate whether ACL ID is superuser.
+   */
+  public static boolean isSuperUserId(String[] superUsers, Id id) {
+    for (String user : superUsers) {
+      // TODO: Validate super group members also when ZK supports setting node 
ACL for groups.
+      if (!user.startsWith(AuthUtil.GROUP_PREFIX) && new Id("sasl", 
user).equals(id)) {
+        return true;
+      }
+    }
+    return false;
+  }
 
   @Override
   public String toString() {

http://git-wip-us.apache.org/repos/asf/hbase/blob/c174a54d/hbase-client/src/test/java/org/apache/hadoop/hbase/zookeeper/TestZKUtil.java
----------------------------------------------------------------------
diff --git 
a/hbase-client/src/test/java/org/apache/hadoop/hbase/zookeeper/TestZKUtil.java 
b/hbase-client/src/test/java/org/apache/hadoop/hbase/zookeeper/TestZKUtil.java
index 7224507..72de935 100644
--- 
a/hbase-client/src/test/java/org/apache/hadoop/hbase/zookeeper/TestZKUtil.java
+++ 
b/hbase-client/src/test/java/org/apache/hadoop/hbase/zookeeper/TestZKUtil.java
@@ -18,11 +18,18 @@
  */
 package org.apache.hadoop.hbase.zookeeper;
 
-import org.apache.hadoop.conf.Configuration;
+import java.io.IOException;
+import java.util.List;
 
+import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.HBaseConfiguration;
 import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.ZooKeeperConnectionException;
+import org.apache.hadoop.hbase.security.Superusers;
 import org.apache.hadoop.hbase.testclassification.SmallTests;
+import org.apache.zookeeper.ZooDefs.Perms;
+import org.apache.zookeeper.data.ACL;
+import org.apache.zookeeper.data.Id;
 import org.junit.Assert;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
@@ -43,4 +50,19 @@ public class TestZKUtil {
     Assert.assertTrue(!clusterKey.contains("\t") && 
!clusterKey.contains("\n"));
     Assert.assertEquals("localhost:3333:hbase,test", clusterKey);
   }
+  
+  @Test
+  public void testCreateACL() throws ZooKeeperConnectionException, IOException 
{
+    Configuration conf = HBaseConfiguration.create();
+    conf.set(Superusers.SUPERUSER_CONF_KEY, 
"user1,@group1,user2,@group2,user3");
+    String node = "/hbase/testCreateACL";
+    ZooKeeperWatcher watcher = new ZooKeeperWatcher(conf, node, null, false);
+    List<ACL> aclList = ZKUtil.createACL(watcher, node, true);
+    Assert.assertEquals(aclList.size(), 4); // 3+1, since ACL will be set for 
the creator by default
+    Assert.assertTrue(!aclList.contains(new ACL(Perms.ALL, new Id("auth", 
"@group1")))
+        && !aclList.contains(new ACL(Perms.ALL, new Id("auth", "@group2"))));
+    Assert.assertTrue(aclList.contains(new ACL(Perms.ALL, new Id("auth", 
"user1")))
+        && aclList.contains(new ACL(Perms.ALL, new Id("auth", "user2")))
+        && aclList.contains(new ACL(Perms.ALL, new Id("auth", "user3"))));
+  }
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/c174a54d/hbase-it/src/test/java/org/apache/hadoop/hbase/test/IntegrationTestZKAndFSPermissions.java
----------------------------------------------------------------------
diff --git 
a/hbase-it/src/test/java/org/apache/hadoop/hbase/test/IntegrationTestZKAndFSPermissions.java
 
b/hbase-it/src/test/java/org/apache/hadoop/hbase/test/IntegrationTestZKAndFSPermissions.java
index c39056d..3845846 100644
--- 
a/hbase-it/src/test/java/org/apache/hadoop/hbase/test/IntegrationTestZKAndFSPermissions.java
+++ 
b/hbase-it/src/test/java/org/apache/hadoop/hbase/test/IntegrationTestZKAndFSPermissions.java
@@ -178,6 +178,7 @@ public class IntegrationTestZKAndFSPermissions extends 
AbstractHBaseTool {
       boolean expectedWorldReadable) throws KeeperException, 
InterruptedException {
     Stat stat = new Stat();
     List<ACL> acls = zk.getZooKeeper().getACL(znode, stat);
+    String[] superUsers = superUser == null ? null : superUser.split(",");
 
     LOG.info("Checking ACLs for znode znode:" + znode + " acls:" + acls);
 
@@ -191,7 +192,7 @@ public class IntegrationTestZKAndFSPermissions extends 
AbstractHBaseTool {
         assertTrue(expectedWorldReadable);
         // assert that anyone can only read
         assertEquals(perms, Perms.READ);
-      } else if (superUser != null && new Id("sasl", superUser).equals(id)) {
+      } else if (superUsers != null && 
ZooKeeperWatcher.isSuperUserId(superUsers, id)) {
         // assert that super user has all the permissions
         assertEquals(perms, Perms.ALL);
       } else if (new Id("sasl", masterPrincipal).equals(id)) {

Reply via email to