[ 
https://issues.apache.org/jira/browse/KAFKA-7028?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16527612#comment-16527612
 ] 

ASF GitHub Bot commented on KAFKA-7028:
---------------------------------------

rajinisivaram closed pull request #5311: KAFKA-7028: Properly authorize custom 
principal objects
URL: https://github.com/apache/kafka/pull/5311
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/core/src/main/scala/kafka/security/auth/SimpleAclAuthorizer.scala 
b/core/src/main/scala/kafka/security/auth/SimpleAclAuthorizer.scala
index 1dc3a1fb412..c5bbfdf41a9 100644
--- a/core/src/main/scala/kafka/security/auth/SimpleAclAuthorizer.scala
+++ b/core/src/main/scala/kafka/security/auth/SimpleAclAuthorizer.scala
@@ -110,7 +110,13 @@ class SimpleAclAuthorizer extends Authorizer with Logging {
       throw new IllegalArgumentException("Only literal resources are 
supported. Got: " + resource.patternType)
     }
 
-    val principal = session.principal
+    // ensure we compare identical classes
+    val sessionPrincipal = session.principal
+    val principal = if (classOf[KafkaPrincipal] != sessionPrincipal.getClass)
+      new KafkaPrincipal(sessionPrincipal.getPrincipalType, 
sessionPrincipal.getName)
+    else
+      sessionPrincipal
+
     val host = session.clientAddress.getHostAddress
 
     def isEmptyAclAndAuthorized(acls: Set[Acl]): Boolean = {
diff --git 
a/core/src/test/scala/unit/kafka/security/auth/SimpleAclAuthorizerTest.scala 
b/core/src/test/scala/unit/kafka/security/auth/SimpleAclAuthorizerTest.scala
index 7ab3c0ad71a..3d1ceb6ddd3 100644
--- a/core/src/test/scala/unit/kafka/security/auth/SimpleAclAuthorizerTest.scala
+++ b/core/src/test/scala/unit/kafka/security/auth/SimpleAclAuthorizerTest.scala
@@ -54,6 +54,10 @@ class SimpleAclAuthorizerTest extends ZooKeeperTestHarness {
   private var config: KafkaConfig = _
   private var zooKeeperClient: ZooKeeperClient = _
 
+  class CustomPrincipal(principalType: String, name: String) extends 
KafkaPrincipal(principalType, name) {
+    override def equals(o: scala.Any): Boolean = false
+  }
+
   @Before
   override def setUp() {
     super.setUp()
@@ -139,6 +143,29 @@ class SimpleAclAuthorizerTest extends ZooKeeperTestHarness 
{
     assertTrue("User3 should have WRITE access from host2", 
simpleAclAuthorizer.authorize(user3Session, Write, resource))
   }
 
+  /**
+    CustomPrincipals should be compared with their principal type and name
+   */
+  @Test
+  def testAllowAccessWithCustomPrincipal() {
+    val user = new KafkaPrincipal(KafkaPrincipal.USER_TYPE, username)
+    val customUserPrincipal = new CustomPrincipal(KafkaPrincipal.USER_TYPE, 
username)
+    val host1 = InetAddress.getByName("192.168.1.1")
+    val host2 = InetAddress.getByName("192.168.1.2")
+
+    // user has READ access from host2 but not from host1
+    val acl1 = new Acl(user, Deny, host1.getHostAddress, Read)
+    val acl2 = new Acl(user, Allow, host2.getHostAddress, Read)
+    val acls = Set[Acl](acl1, acl2)
+    changeAclAndVerify(Set.empty[Acl], acls, Set.empty[Acl])
+
+    val host1Session = Session(customUserPrincipal, host1)
+    val host2Session = Session(customUserPrincipal, host2)
+
+    assertTrue("User1 should have READ access from host2", 
simpleAclAuthorizer.authorize(host2Session, Read, resource))
+    assertFalse("User1 should not have READ access from host1 due to denyAcl", 
simpleAclAuthorizer.authorize(host1Session, Read, resource))
+  }
+
   @Test
   def testDenyTakesPrecedence() {
     val user = new KafkaPrincipal(KafkaPrincipal.USER_TYPE, username)
@@ -177,6 +204,19 @@ class SimpleAclAuthorizerTest extends ZooKeeperTestHarness 
{
     assertTrue("superuser always has access, no matter what acls.", 
simpleAclAuthorizer.authorize(session2, Read, resource))
   }
 
+  /**
+    CustomPrincipals should be compared with their principal type and name
+   */
+  @Test
+  def testSuperUserWithCustomPrincipalHasAccess(): Unit = {
+    val denyAllAcl = new Acl(Acl.WildCardPrincipal, Deny, WildCardHost, All)
+    changeAclAndVerify(Set.empty[Acl], Set[Acl](denyAllAcl), Set.empty[Acl])
+
+    val session = Session(new CustomPrincipal(KafkaPrincipal.USER_TYPE, 
"superuser1"), InetAddress.getByName("192.0.4.4"))
+
+    assertTrue("superuser with custom principal always has access, no matter 
what acls.", simpleAclAuthorizer.authorize(session, Read, resource))
+  }
+
   @Test
   def testWildCardAcls(): Unit = {
     assertFalse("when acls = [],  authorizer should fail close.", 
simpleAclAuthorizer.authorize(session, Read, resource))


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> super.users doesn't work with custom principals
> -----------------------------------------------
>
>                 Key: KAFKA-7028
>                 URL: https://issues.apache.org/jira/browse/KAFKA-7028
>             Project: Kafka
>          Issue Type: Bug
>            Reporter: Ismael Juma
>            Assignee: Stanislav Kozlovski
>            Priority: Major
>             Fix For: 2.1.0
>
>
> SimpleAclAuthorizer creates a KafkaPrincipal for the users defined in the 
> super.users broker config. However, it should use the configured 
> KafkaPrincipalBuilder so that it works with a custom defined one.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to