[ https://issues.apache.org/jira/browse/KAFKA-7028?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16527612#comment-16527612 ]
ASF GitHub Bot commented on KAFKA-7028: --------------------------------------- rajinisivaram closed pull request #5311: KAFKA-7028: Properly authorize custom principal objects URL: https://github.com/apache/kafka/pull/5311 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/core/src/main/scala/kafka/security/auth/SimpleAclAuthorizer.scala b/core/src/main/scala/kafka/security/auth/SimpleAclAuthorizer.scala index 1dc3a1fb412..c5bbfdf41a9 100644 --- a/core/src/main/scala/kafka/security/auth/SimpleAclAuthorizer.scala +++ b/core/src/main/scala/kafka/security/auth/SimpleAclAuthorizer.scala @@ -110,7 +110,13 @@ class SimpleAclAuthorizer extends Authorizer with Logging { throw new IllegalArgumentException("Only literal resources are supported. Got: " + resource.patternType) } - val principal = session.principal + // ensure we compare identical classes + val sessionPrincipal = session.principal + val principal = if (classOf[KafkaPrincipal] != sessionPrincipal.getClass) + new KafkaPrincipal(sessionPrincipal.getPrincipalType, sessionPrincipal.getName) + else + sessionPrincipal + val host = session.clientAddress.getHostAddress def isEmptyAclAndAuthorized(acls: Set[Acl]): Boolean = { diff --git a/core/src/test/scala/unit/kafka/security/auth/SimpleAclAuthorizerTest.scala b/core/src/test/scala/unit/kafka/security/auth/SimpleAclAuthorizerTest.scala index 7ab3c0ad71a..3d1ceb6ddd3 100644 --- a/core/src/test/scala/unit/kafka/security/auth/SimpleAclAuthorizerTest.scala +++ b/core/src/test/scala/unit/kafka/security/auth/SimpleAclAuthorizerTest.scala @@ -54,6 +54,10 @@ class SimpleAclAuthorizerTest extends ZooKeeperTestHarness { private var config: KafkaConfig = _ private var zooKeeperClient: ZooKeeperClient = _ + class CustomPrincipal(principalType: String, name: String) extends KafkaPrincipal(principalType, name) { + override def equals(o: scala.Any): Boolean = false + } + @Before override def setUp() { super.setUp() @@ -139,6 +143,29 @@ class SimpleAclAuthorizerTest extends ZooKeeperTestHarness { assertTrue("User3 should have WRITE access from host2", simpleAclAuthorizer.authorize(user3Session, Write, resource)) } + /** + CustomPrincipals should be compared with their principal type and name + */ + @Test + def testAllowAccessWithCustomPrincipal() { + val user = new KafkaPrincipal(KafkaPrincipal.USER_TYPE, username) + val customUserPrincipal = new CustomPrincipal(KafkaPrincipal.USER_TYPE, username) + val host1 = InetAddress.getByName("192.168.1.1") + val host2 = InetAddress.getByName("192.168.1.2") + + // user has READ access from host2 but not from host1 + val acl1 = new Acl(user, Deny, host1.getHostAddress, Read) + val acl2 = new Acl(user, Allow, host2.getHostAddress, Read) + val acls = Set[Acl](acl1, acl2) + changeAclAndVerify(Set.empty[Acl], acls, Set.empty[Acl]) + + val host1Session = Session(customUserPrincipal, host1) + val host2Session = Session(customUserPrincipal, host2) + + assertTrue("User1 should have READ access from host2", simpleAclAuthorizer.authorize(host2Session, Read, resource)) + assertFalse("User1 should not have READ access from host1 due to denyAcl", simpleAclAuthorizer.authorize(host1Session, Read, resource)) + } + @Test def testDenyTakesPrecedence() { val user = new KafkaPrincipal(KafkaPrincipal.USER_TYPE, username) @@ -177,6 +204,19 @@ class SimpleAclAuthorizerTest extends ZooKeeperTestHarness { assertTrue("superuser always has access, no matter what acls.", simpleAclAuthorizer.authorize(session2, Read, resource)) } + /** + CustomPrincipals should be compared with their principal type and name + */ + @Test + def testSuperUserWithCustomPrincipalHasAccess(): Unit = { + val denyAllAcl = new Acl(Acl.WildCardPrincipal, Deny, WildCardHost, All) + changeAclAndVerify(Set.empty[Acl], Set[Acl](denyAllAcl), Set.empty[Acl]) + + val session = Session(new CustomPrincipal(KafkaPrincipal.USER_TYPE, "superuser1"), InetAddress.getByName("192.0.4.4")) + + assertTrue("superuser with custom principal always has access, no matter what acls.", simpleAclAuthorizer.authorize(session, Read, resource)) + } + @Test def testWildCardAcls(): Unit = { assertFalse("when acls = [], authorizer should fail close.", simpleAclAuthorizer.authorize(session, Read, resource)) ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > super.users doesn't work with custom principals > ----------------------------------------------- > > Key: KAFKA-7028 > URL: https://issues.apache.org/jira/browse/KAFKA-7028 > Project: Kafka > Issue Type: Bug > Reporter: Ismael Juma > Assignee: Stanislav Kozlovski > Priority: Major > Fix For: 2.1.0 > > > SimpleAclAuthorizer creates a KafkaPrincipal for the users defined in the > super.users broker config. However, it should use the configured > KafkaPrincipalBuilder so that it works with a custom defined one. -- This message was sent by Atlassian JIRA (v7.6.3#76005)