This is an automated email from the ASF dual-hosted git repository.
linaataustin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/sentry.git
The following commit(s) were added to refs/heads/master by this push:
new 8b53680 SENTRY-2535: SentryKafkaAuthorizer throws Exception when
describing ACLs (Gergo Wilder, reviewed by Kalyan Kumar Kalvagadda)
8b53680 is described below
commit 8b536800d20b70b6aa286cb1e7c6decec645a6af
Author: lina.li <[email protected]>
AuthorDate: Thu Nov 7 11:33:48 2019 -0600
SENTRY-2535: SentryKafkaAuthorizer throws Exception when describing ACLs
(Gergo Wilder, reviewed by Kalyan Kumar Kalvagadda)
---
.../java/org/apache/sentry/kafka/ConvertUtil.java | 15 +++++--
.../sentry/kafka/binding/KafkaAuthBinding.java | 9 ++--
.../sentry/kafka/authorizer/ConvertUtilTest.java | 51 +++++++++++++++++++++-
.../sentry/tests/e2e/kafka/TestAclsCrud.java | 46 +++++++++++++++++++
4 files changed, 111 insertions(+), 10 deletions(-)
diff --git
a/sentry-binding/sentry-binding-kafka/src/main/java/org/apache/sentry/kafka/ConvertUtil.java
b/sentry-binding/sentry-binding-kafka/src/main/java/org/apache/sentry/kafka/ConvertUtil.java
index c878308..f2a35ae 100644
---
a/sentry-binding/sentry-binding-kafka/src/main/java/org/apache/sentry/kafka/ConvertUtil.java
+++
b/sentry-binding/sentry-binding-kafka/src/main/java/org/apache/sentry/kafka/ConvertUtil.java
@@ -18,13 +18,13 @@ package org.apache.sentry.kafka;
import java.util.List;
+import com.google.common.collect.Lists;
import kafka.security.auth.Resource;
-
+import kafka.security.auth.ResourceType$;
+import org.apache.sentry.api.generic.thrift.TAuthorizable;
import org.apache.sentry.core.common.Authorizable;
-import org.apache.sentry.core.model.kafka.Host;
-
-import com.google.common.collect.Lists;
import org.apache.sentry.core.model.kafka.KafkaAuthorizable;
+import org.apache.sentry.core.model.kafka.Host;
public class ConvertUtil {
@@ -52,4 +52,11 @@ public class ConvertUtil {
return authorizables;
}
+ public static Resource convertAuthorizableToResource(TAuthorizable
tAuthorizable) {
+ // Kafka's GROUP resource is referred as CONSUMERGROUP within Sentry.
+ String authorizableType =
tAuthorizable.getType().equalsIgnoreCase("consumergroup") ? "group" :
tAuthorizable.getType();
+ Resource resource = new
Resource(ResourceType$.MODULE$.fromString(authorizableType),
tAuthorizable.getName());
+
+ return resource;
+ }
}
diff --git
a/sentry-binding/sentry-binding-kafka/src/main/java/org/apache/sentry/kafka/binding/KafkaAuthBinding.java
b/sentry-binding/sentry-binding-kafka/src/main/java/org/apache/sentry/kafka/binding/KafkaAuthBinding.java
index 07b21b9..eba9e0b 100644
---
a/sentry-binding/sentry-binding-kafka/src/main/java/org/apache/sentry/kafka/binding/KafkaAuthBinding.java
+++
b/sentry-binding/sentry-binding-kafka/src/main/java/org/apache/sentry/kafka/binding/KafkaAuthBinding.java
@@ -24,17 +24,15 @@ import java.util.HashSet;
import java.util.List;
import java.util.Set;
+import com.google.common.collect.Sets;
import kafka.security.auth.Acl;
import kafka.security.auth.Allow;
import kafka.security.auth.Allow$;
import kafka.security.auth.Operation$;
-import kafka.security.auth.ResourceType$;
-import org.apache.hadoop.conf.Configuration;
-
-import com.google.common.collect.Sets;
import kafka.network.RequestChannel;
import kafka.security.auth.Operation;
import kafka.security.auth.Resource;
+import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.security.SecurityUtil;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.kafka.common.KafkaException;
@@ -478,7 +476,8 @@ public class KafkaAuthBinding {
if
(tAuthorizable.getType().equals(KafkaAuthorizable.AuthorizableType.HOST.name()))
{
host = tAuthorizable.getName();
} else {
- Resource resource = new
Resource(ResourceType$.MODULE$.fromString(tAuthorizable.getType()),
tAuthorizable.getName());
+ Resource resource =
ConvertUtil.convertAuthorizableToResource(tAuthorizable);
+
if (operation.equals("*")) {
operation = "All";
}
diff --git
a/sentry-binding/sentry-binding-kafka/src/test/java/org/apache/sentry/kafka/authorizer/ConvertUtilTest.java
b/sentry-binding/sentry-binding-kafka/src/test/java/org/apache/sentry/kafka/authorizer/ConvertUtilTest.java
index 494e212..e6adc02 100644
---
a/sentry-binding/sentry-binding-kafka/src/test/java/org/apache/sentry/kafka/authorizer/ConvertUtilTest.java
+++
b/sentry-binding/sentry-binding-kafka/src/test/java/org/apache/sentry/kafka/authorizer/ConvertUtilTest.java
@@ -20,8 +20,9 @@ import junit.framework.Assert;
import kafka.security.auth.Resource;
import kafka.security.auth.Resource$;
import kafka.security.auth.ResourceType$;
+import org.apache.sentry.api.generic.thrift.TAuthorizable;
import org.apache.sentry.core.common.Authorizable;
-import org.apache.sentry.core.model.kafka.KafkaAuthorizable;
+import org.apache.sentry.core.model.kafka.*;
import org.apache.sentry.kafka.ConvertUtil;
import org.junit.Test;
@@ -100,4 +101,52 @@ public class ConvertUtilTest {
}
Assert.assertEquals(authorizables.size(), 2);
}
+
+ @Test
+ public void testConvertClusterAuthorizableToResource() {
+ Cluster cluster = new Cluster();
+
+ TAuthorizable authorizable = new TAuthorizable(cluster.getTypeName(),
cluster.getName());
+
+ Resource actualResource =
ConvertUtil.convertAuthorizableToResource(authorizable);
+
+ Assert.assertEquals(ResourceType$.MODULE$.fromString("cluster"),
actualResource.resourceType());
+ Assert.assertEquals(Resource.ClusterResourceName(), actualResource.name());
+ }
+
+ @Test
+ public void testConvertTopicAuthorizableToResource() {
+ Topic topic = new Topic("testTopic");
+
+ TAuthorizable authorizable = new TAuthorizable(topic.getTypeName(),
topic.getName());
+
+ Resource actualResource =
ConvertUtil.convertAuthorizableToResource(authorizable);
+
+ Assert.assertEquals(ResourceType$.MODULE$.fromString("topic"),
actualResource.resourceType());
+ Assert.assertEquals("testTopic", actualResource.name());
+ }
+
+ @Test
+ public void testConvertTransactionalIdAuthorizableToResource() {
+ TransactionalId transactionalId = new
TransactionalId("testTransactionalId");
+
+ TAuthorizable authorizable = new
TAuthorizable(transactionalId.getTypeName(), transactionalId.getName());
+
+ Resource actualResource =
ConvertUtil.convertAuthorizableToResource(authorizable);
+
+ Assert.assertEquals(ResourceType$.MODULE$.fromString("transactionalId"),
actualResource.resourceType());
+ Assert.assertEquals("testTransactionalId", actualResource.name());
+ }
+
+ @Test
+ public void testConvertConsumerGroupAuthorizableToResource() {
+ ConsumerGroup comsumerGroup = new ConsumerGroup("testConsumerGroup");
+
+ TAuthorizable authorizable = new
TAuthorizable(comsumerGroup.getTypeName(), comsumerGroup.getName());
+
+ Resource actualResource =
ConvertUtil.convertAuthorizableToResource(authorizable);
+
+ Assert.assertEquals(ResourceType$.MODULE$.fromString("group"),
actualResource.resourceType());
+ Assert.assertEquals("testConsumerGroup", actualResource.name());
+ }
}
diff --git
a/sentry-tests/sentry-tests-kafka/src/test/java/org/apache/sentry/tests/e2e/kafka/TestAclsCrud.java
b/sentry-tests/sentry-tests-kafka/src/test/java/org/apache/sentry/tests/e2e/kafka/TestAclsCrud.java
index ac17f36..841a20d 100644
---
a/sentry-tests/sentry-tests-kafka/src/test/java/org/apache/sentry/tests/e2e/kafka/TestAclsCrud.java
+++
b/sentry-tests/sentry-tests-kafka/src/test/java/org/apache/sentry/tests/e2e/kafka/TestAclsCrud.java
@@ -327,4 +327,50 @@ public class TestAclsCrud extends
AbstractKafkaSentryTestBase {
Assert.assertTrue("Obtained acl does not match expected acl for
resource.", obtainedAcls.get(resource).get().contains(acl01));
Assert.assertTrue("Obtained acl does not match expected acl for
resource2.", obtainedAcls.get(resource2).get().contains(acl2));
}
+
+ @Test
+ public void testGetAcls() {
+ sentryKafkaAuthorizer = new SentryKafkaAuthorizer();
+ java.util.Map<String, String> configs = new HashMap<>();
+ configs.put(KafkaAuthConf.SENTRY_KAFKA_SITE_URL, "file://" +
sentrySitePath.getAbsolutePath());
+ sentryKafkaAuthorizer.configure(configs);
+
+ final String role1 = "role1";
+ final Resource topicResource = new
Resource(ResourceType$.MODULE$.fromString("TOPIC"), "test-topic");
+ final Resource consumerGroupResource = new
Resource(ResourceType$.MODULE$.fromString("GROUP"), "test-consumergroup");
+ final Resource transactionalIdResource = new
Resource(ResourceType$.MODULE$.fromString("TRANSACTIONALID"),
"test-transactionalId");
+ final Resource clusterResource = new
Resource(ResourceType$.MODULE$.fromString("CLUSTER"), "test-cluster");
+
+ // Add role
+ try {
+ sentryKafkaAuthorizer.addRole(role1);
+ } catch (Exception ex) {
+ Assert.fail("Failed to create role.");
+ }
+
+ // Add acl for topic
+ Set<Acl> acls = new HashSet<>();
+ final KafkaPrincipal principal1 = new KafkaPrincipal("role", role1);
+ final Acl acl = new Acl(principal1,
+ Allow$.MODULE$,
+ "127.0.0.1",
+ Operation$.MODULE$.fromString("READ"));
+ acls.add(acl);
+ scala.collection.immutable.Set<Acl> aclsScala =
scala.collection.JavaConversions.asScalaSet(acls).toSet();
+
+ try {
+ sentryKafkaAuthorizer.addAcls(aclsScala, topicResource);
+ sentryKafkaAuthorizer.addAcls(aclsScala, consumerGroupResource);
+ sentryKafkaAuthorizer.addAcls(aclsScala, transactionalIdResource);
+ sentryKafkaAuthorizer.addAcls(aclsScala, clusterResource);
+ } catch (Exception ex) {
+ Assert.fail("Failed to add acls.");
+ }
+
+ final Map<Resource, scala.collection.immutable.Set<Acl>> obtainedAcls =
sentryKafkaAuthorizer.getAcls(principal1);
+ Assert.assertTrue("Obtained acls must contain acl for topic resource.",
obtainedAcls.keySet().contains(topicResource));
+ Assert.assertTrue("Obtained acls must contain acl for consumer group
resource.", obtainedAcls.keySet().contains(consumerGroupResource));
+ Assert.assertTrue("Obtained acls must contain acl for cluster resource.",
obtainedAcls.keySet().contains(clusterResource));
+ Assert.assertTrue("Obtained acls must contain acl for transactionalid
resource.", obtainedAcls.keySet().contains(transactionalIdResource));
+ }
}
\ No newline at end of file