This is an automated email from the ASF dual-hosted git repository.

linaataustin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/sentry.git


The following commit(s) were added to refs/heads/master by this push:
     new 8b53680  SENTRY-2535: SentryKafkaAuthorizer throws Exception when 
describing ACLs (Gergo Wilder, reviewed by Kalyan Kumar Kalvagadda)
8b53680 is described below

commit 8b536800d20b70b6aa286cb1e7c6decec645a6af
Author: lina.li <[email protected]>
AuthorDate: Thu Nov 7 11:33:48 2019 -0600

    SENTRY-2535: SentryKafkaAuthorizer throws Exception when describing ACLs 
(Gergo Wilder, reviewed by Kalyan Kumar Kalvagadda)
---
 .../java/org/apache/sentry/kafka/ConvertUtil.java  | 15 +++++--
 .../sentry/kafka/binding/KafkaAuthBinding.java     |  9 ++--
 .../sentry/kafka/authorizer/ConvertUtilTest.java   | 51 +++++++++++++++++++++-
 .../sentry/tests/e2e/kafka/TestAclsCrud.java       | 46 +++++++++++++++++++
 4 files changed, 111 insertions(+), 10 deletions(-)

diff --git 
a/sentry-binding/sentry-binding-kafka/src/main/java/org/apache/sentry/kafka/ConvertUtil.java
 
b/sentry-binding/sentry-binding-kafka/src/main/java/org/apache/sentry/kafka/ConvertUtil.java
index c878308..f2a35ae 100644
--- 
a/sentry-binding/sentry-binding-kafka/src/main/java/org/apache/sentry/kafka/ConvertUtil.java
+++ 
b/sentry-binding/sentry-binding-kafka/src/main/java/org/apache/sentry/kafka/ConvertUtil.java
@@ -18,13 +18,13 @@ package org.apache.sentry.kafka;
 
 import java.util.List;
 
+import com.google.common.collect.Lists;
 import kafka.security.auth.Resource;
-
+import kafka.security.auth.ResourceType$;
+import org.apache.sentry.api.generic.thrift.TAuthorizable;
 import org.apache.sentry.core.common.Authorizable;
-import org.apache.sentry.core.model.kafka.Host;
-
-import com.google.common.collect.Lists;
 import org.apache.sentry.core.model.kafka.KafkaAuthorizable;
+import org.apache.sentry.core.model.kafka.Host;
 
 public class ConvertUtil {
 
@@ -52,4 +52,11 @@ public class ConvertUtil {
     return authorizables;
   }
 
+  public static Resource convertAuthorizableToResource(TAuthorizable 
tAuthorizable) {
+    // Kafka's GROUP resource is referred as CONSUMERGROUP within Sentry.
+    String authorizableType = 
tAuthorizable.getType().equalsIgnoreCase("consumergroup") ? "group" : 
tAuthorizable.getType();
+    Resource resource = new 
Resource(ResourceType$.MODULE$.fromString(authorizableType), 
tAuthorizable.getName());
+
+    return resource;
+  }
 }
diff --git 
a/sentry-binding/sentry-binding-kafka/src/main/java/org/apache/sentry/kafka/binding/KafkaAuthBinding.java
 
b/sentry-binding/sentry-binding-kafka/src/main/java/org/apache/sentry/kafka/binding/KafkaAuthBinding.java
index 07b21b9..eba9e0b 100644
--- 
a/sentry-binding/sentry-binding-kafka/src/main/java/org/apache/sentry/kafka/binding/KafkaAuthBinding.java
+++ 
b/sentry-binding/sentry-binding-kafka/src/main/java/org/apache/sentry/kafka/binding/KafkaAuthBinding.java
@@ -24,17 +24,15 @@ import java.util.HashSet;
 import java.util.List;
 import java.util.Set;
 
+import com.google.common.collect.Sets;
 import kafka.security.auth.Acl;
 import kafka.security.auth.Allow;
 import kafka.security.auth.Allow$;
 import kafka.security.auth.Operation$;
-import kafka.security.auth.ResourceType$;
-import org.apache.hadoop.conf.Configuration;
-
-import com.google.common.collect.Sets;
 import kafka.network.RequestChannel;
 import kafka.security.auth.Operation;
 import kafka.security.auth.Resource;
+import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.security.SecurityUtil;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.kafka.common.KafkaException;
@@ -478,7 +476,8 @@ public class KafkaAuthBinding {
           if 
(tAuthorizable.getType().equals(KafkaAuthorizable.AuthorizableType.HOST.name()))
 {
             host = tAuthorizable.getName();
           } else {
-            Resource resource = new 
Resource(ResourceType$.MODULE$.fromString(tAuthorizable.getType()), 
tAuthorizable.getName());
+            Resource resource = 
ConvertUtil.convertAuthorizableToResource(tAuthorizable);
+
             if (operation.equals("*")) {
               operation = "All";
             }
diff --git 
a/sentry-binding/sentry-binding-kafka/src/test/java/org/apache/sentry/kafka/authorizer/ConvertUtilTest.java
 
b/sentry-binding/sentry-binding-kafka/src/test/java/org/apache/sentry/kafka/authorizer/ConvertUtilTest.java
index 494e212..e6adc02 100644
--- 
a/sentry-binding/sentry-binding-kafka/src/test/java/org/apache/sentry/kafka/authorizer/ConvertUtilTest.java
+++ 
b/sentry-binding/sentry-binding-kafka/src/test/java/org/apache/sentry/kafka/authorizer/ConvertUtilTest.java
@@ -20,8 +20,9 @@ import junit.framework.Assert;
 import kafka.security.auth.Resource;
 import kafka.security.auth.Resource$;
 import kafka.security.auth.ResourceType$;
+import org.apache.sentry.api.generic.thrift.TAuthorizable;
 import org.apache.sentry.core.common.Authorizable;
-import org.apache.sentry.core.model.kafka.KafkaAuthorizable;
+import org.apache.sentry.core.model.kafka.*;
 import org.apache.sentry.kafka.ConvertUtil;
 import org.junit.Test;
 
@@ -100,4 +101,52 @@ public class ConvertUtilTest {
     }
     Assert.assertEquals(authorizables.size(), 2);
   }
+
+  @Test
+  public void testConvertClusterAuthorizableToResource() {
+    Cluster cluster = new Cluster();
+
+    TAuthorizable authorizable = new TAuthorizable(cluster.getTypeName(), 
cluster.getName());
+
+    Resource actualResource = 
ConvertUtil.convertAuthorizableToResource(authorizable);
+
+    Assert.assertEquals(ResourceType$.MODULE$.fromString("cluster"), 
actualResource.resourceType());
+    Assert.assertEquals(Resource.ClusterResourceName(), actualResource.name());
+  }
+
+  @Test
+  public void testConvertTopicAuthorizableToResource() {
+    Topic topic = new Topic("testTopic");
+
+    TAuthorizable authorizable = new TAuthorizable(topic.getTypeName(), 
topic.getName());
+
+    Resource actualResource = 
ConvertUtil.convertAuthorizableToResource(authorizable);
+
+    Assert.assertEquals(ResourceType$.MODULE$.fromString("topic"), 
actualResource.resourceType());
+    Assert.assertEquals("testTopic", actualResource.name());
+  }
+
+  @Test
+  public void testConvertTransactionalIdAuthorizableToResource() {
+    TransactionalId transactionalId = new 
TransactionalId("testTransactionalId");
+
+    TAuthorizable authorizable = new 
TAuthorizable(transactionalId.getTypeName(), transactionalId.getName());
+
+    Resource actualResource = 
ConvertUtil.convertAuthorizableToResource(authorizable);
+
+    Assert.assertEquals(ResourceType$.MODULE$.fromString("transactionalId"), 
actualResource.resourceType());
+    Assert.assertEquals("testTransactionalId", actualResource.name());
+  }
+
+  @Test
+  public void testConvertConsumerGroupAuthorizableToResource() {
+    ConsumerGroup comsumerGroup = new ConsumerGroup("testConsumerGroup");
+
+    TAuthorizable authorizable = new 
TAuthorizable(comsumerGroup.getTypeName(), comsumerGroup.getName());
+
+    Resource actualResource = 
ConvertUtil.convertAuthorizableToResource(authorizable);
+
+    Assert.assertEquals(ResourceType$.MODULE$.fromString("group"), 
actualResource.resourceType());
+    Assert.assertEquals("testConsumerGroup", actualResource.name());
+  }
 }
diff --git 
a/sentry-tests/sentry-tests-kafka/src/test/java/org/apache/sentry/tests/e2e/kafka/TestAclsCrud.java
 
b/sentry-tests/sentry-tests-kafka/src/test/java/org/apache/sentry/tests/e2e/kafka/TestAclsCrud.java
index ac17f36..841a20d 100644
--- 
a/sentry-tests/sentry-tests-kafka/src/test/java/org/apache/sentry/tests/e2e/kafka/TestAclsCrud.java
+++ 
b/sentry-tests/sentry-tests-kafka/src/test/java/org/apache/sentry/tests/e2e/kafka/TestAclsCrud.java
@@ -327,4 +327,50 @@ public class TestAclsCrud extends 
AbstractKafkaSentryTestBase {
     Assert.assertTrue("Obtained acl does not match expected acl for 
resource.", obtainedAcls.get(resource).get().contains(acl01));
     Assert.assertTrue("Obtained acl does not match expected acl for 
resource2.", obtainedAcls.get(resource2).get().contains(acl2));
   }
+
+  @Test
+  public void testGetAcls() {
+    sentryKafkaAuthorizer = new SentryKafkaAuthorizer();
+    java.util.Map<String, String> configs = new HashMap<>();
+    configs.put(KafkaAuthConf.SENTRY_KAFKA_SITE_URL, "file://" + 
sentrySitePath.getAbsolutePath());
+    sentryKafkaAuthorizer.configure(configs);
+
+    final String role1 = "role1";
+    final Resource topicResource = new 
Resource(ResourceType$.MODULE$.fromString("TOPIC"), "test-topic");
+    final Resource consumerGroupResource = new 
Resource(ResourceType$.MODULE$.fromString("GROUP"), "test-consumergroup");
+    final Resource transactionalIdResource = new 
Resource(ResourceType$.MODULE$.fromString("TRANSACTIONALID"), 
"test-transactionalId");
+    final Resource clusterResource = new 
Resource(ResourceType$.MODULE$.fromString("CLUSTER"), "test-cluster");
+
+    // Add role
+    try {
+      sentryKafkaAuthorizer.addRole(role1);
+    } catch (Exception ex) {
+      Assert.fail("Failed to create role.");
+    }
+
+    // Add acl for topic
+    Set<Acl> acls = new HashSet<>();
+    final KafkaPrincipal principal1 = new KafkaPrincipal("role", role1);
+    final Acl acl = new Acl(principal1,
+        Allow$.MODULE$,
+        "127.0.0.1",
+        Operation$.MODULE$.fromString("READ"));
+    acls.add(acl);
+    scala.collection.immutable.Set<Acl> aclsScala = 
scala.collection.JavaConversions.asScalaSet(acls).toSet();
+
+    try {
+      sentryKafkaAuthorizer.addAcls(aclsScala, topicResource);
+      sentryKafkaAuthorizer.addAcls(aclsScala, consumerGroupResource);
+      sentryKafkaAuthorizer.addAcls(aclsScala, transactionalIdResource);
+      sentryKafkaAuthorizer.addAcls(aclsScala, clusterResource);
+    } catch (Exception ex) {
+      Assert.fail("Failed to add acls.");
+    }
+
+    final Map<Resource, scala.collection.immutable.Set<Acl>> obtainedAcls = 
sentryKafkaAuthorizer.getAcls(principal1);
+    Assert.assertTrue("Obtained acls must contain acl for topic resource.", 
obtainedAcls.keySet().contains(topicResource));
+    Assert.assertTrue("Obtained acls must contain acl for consumer group 
resource.", obtainedAcls.keySet().contains(consumerGroupResource));
+    Assert.assertTrue("Obtained acls must contain acl for cluster resource.", 
obtainedAcls.keySet().contains(clusterResource));
+    Assert.assertTrue("Obtained acls must contain acl for transactionalid 
resource.", obtainedAcls.keySet().contains(transactionalIdResource));
+  }
 }
\ No newline at end of file

Reply via email to