rajinisivaram commented on a change in pull request #9485:
URL: https://github.com/apache/kafka/pull/9485#discussion_r539594382



##########
File path: clients/src/main/java/org/apache/kafka/common/acl/ResourceIndex.java
##########
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.common.acl;

Review comment:
       This package is part of the public API, but the class looks like it 
should be internal?

##########
File path: core/src/main/scala/kafka/security/authorizer/AclAuthorizer.scala
##########
@@ -304,6 +308,105 @@ class AclAuthorizer extends Authorizer with Logging {
     if (zkClient != null) zkClient.close()
   }
 
+  // TODO: 1. Discuss how to log audit message
+  // TODO: 2. Discuss if we need a trie to optimize(mainly for the O(n^2) loop 
but I think
+  //  in most of the cases it would be O(1) because denyDominatePrefixAllow 
should be rare
+  override def authorizeByResourceType(requestContext: 
AuthorizableRequestContext,
+                                       op: AclOperation,
+                                       resourceType: ResourceType): 
AuthorizationResult = {
+    SecurityUtils.authorizeByResourceTypeCheckArgs(op, resourceType)
+
+    val principal = new KafkaPrincipal(
+      requestContext.principal().getPrincipalType,
+      requestContext.principal().getName).toString
+    val host = requestContext.clientAddress().getHostAddress
+    val action = new Action(op, new ResourcePattern(resourceType, "NONE", 
PatternType.UNKNOWN), 0, true, true)
+
+    val denyLiterals = matchingResources(
+      principal, host, op, AclPermissionType.DENY, resourceType, 
PatternType.LITERAL)
+
+    if (denyAll(denyLiterals)) {
+      logAuditMessage(requestContext, action, false)
+      return AuthorizationResult.DENIED
+    }
+
+    if (shouldAllowEveryoneIfNoAclIsFound) {
+      logAuditMessage(requestContext, action, true)
+      return AuthorizationResult.ALLOWED
+    }
+
+    val allowLiterals = matchingResources(
+      principal, host, op, AclPermissionType.ALLOW, resourceType, 
PatternType.LITERAL)
+    val allowPrefixes = matchingResources(
+      principal, host, op, AclPermissionType.ALLOW, resourceType, 
PatternType.PREFIXED)
+    val denyPrefixes = matchingResources(
+      principal, host, op, AclPermissionType.DENY, resourceType, 
PatternType.PREFIXED)
+
+    if (allowAny(allowLiterals, allowPrefixes, denyLiterals, denyPrefixes)) {
+      logAuditMessage(requestContext, action, true)
+      return AuthorizationResult.ALLOWED
+    }
+
+    logAuditMessage(requestContext, action, false)
+    AuthorizationResult.DENIED
+  }
+
+  def matchingResources(principal: String, host: String, op: AclOperation, 
permission: AclPermissionType,
+                        resourceType: ResourceType, patternType: PatternType): 
List[mutable.HashSet[String]] = {
+    var matched = List[mutable.HashSet[String]]()
+    for (p <- Set(principal, AclEntry.WildcardPrincipal.toString)) {

Review comment:
       `AclEntry.WildcardPrincipalString`

##########
File path: core/src/main/scala/kafka/security/authorizer/AuthorizerWrapper.scala
##########
@@ -175,4 +181,32 @@ class AuthorizerWrapper(private[kafka] val baseAuthorizer: 
kafka.security.auth.A
   override def close(): Unit = {
     baseAuthorizer.close()
   }
+
+  override def authorizeByResourceType(requestContext: 
AuthorizableRequestContext,
+                                       op: AclOperation,
+                                       resourceType: ResourceType): 
AuthorizationResult = {
+    SecurityUtils.authorizeByResourceTypeCheckArgs(op, resourceType)
+
+    if (denyAllResource(requestContext, op, resourceType)) {
+      AuthorizationResult.DENIED
+    } else if (shouldAllowEveryoneIfNoAclIsFound) {
+      AuthorizationResult.ALLOWED
+    } else {
+      super.authorizeByResourceType(requestContext, op, resourceType)
+    }
+  }
+
+  private def denyAllResource(requestContext: AuthorizableRequestContext,
+                      op: AclOperation,
+                      resourceType: ResourceType): Boolean = {
+    val resourceTypeFilter = new ResourcePatternFilter(
+      resourceType, null, PatternType.ANY)
+    val principal = new 
KafkaPrincipal(requestContext.principal.getPrincipalType, 
requestContext.principal.getName)
+    val accessControlEntry = new AccessControlEntryFilter(
+      principal.toString, requestContext.clientAddress().getHostAddress, op, 
AclPermissionType.DENY)

Review comment:
       Does this work with an ACL with wildcard host?

##########
File path: core/src/main/scala/kafka/security/authorizer/AclAuthorizer.scala
##########
@@ -304,6 +308,105 @@ class AclAuthorizer extends Authorizer with Logging {
     if (zkClient != null) zkClient.close()
   }
 
+  // TODO: 1. Discuss how to log audit message
+  // TODO: 2. Discuss if we need a trie to optimize(mainly for the O(n^2) loop 
but I think
+  //  in most of the cases it would be O(1) because denyDominatePrefixAllow 
should be rare

Review comment:
       Can we remove the TODO comments?

##########
File path: core/src/main/scala/kafka/security/authorizer/AclAuthorizer.scala
##########
@@ -304,6 +308,105 @@ class AclAuthorizer extends Authorizer with Logging {
     if (zkClient != null) zkClient.close()
   }
 
+  // TODO: 1. Discuss how to log audit message
+  // TODO: 2. Discuss if we need a trie to optimize(mainly for the O(n^2) loop 
but I think
+  //  in most of the cases it would be O(1) because denyDominatePrefixAllow 
should be rare
+  override def authorizeByResourceType(requestContext: 
AuthorizableRequestContext,
+                                       op: AclOperation,
+                                       resourceType: ResourceType): 
AuthorizationResult = {
+    SecurityUtils.authorizeByResourceTypeCheckArgs(op, resourceType)
+
+    val principal = new KafkaPrincipal(
+      requestContext.principal().getPrincipalType,
+      requestContext.principal().getName).toString
+    val host = requestContext.clientAddress().getHostAddress
+    val action = new Action(op, new ResourcePattern(resourceType, "NONE", 
PatternType.UNKNOWN), 0, true, true)
+
+    val denyLiterals = matchingResources(
+      principal, host, op, AclPermissionType.DENY, resourceType, 
PatternType.LITERAL)
+
+    if (denyAll(denyLiterals)) {
+      logAuditMessage(requestContext, action, false)
+      return AuthorizationResult.DENIED
+    }
+
+    if (shouldAllowEveryoneIfNoAclIsFound) {
+      logAuditMessage(requestContext, action, true)
+      return AuthorizationResult.ALLOWED
+    }
+
+    val allowLiterals = matchingResources(
+      principal, host, op, AclPermissionType.ALLOW, resourceType, 
PatternType.LITERAL)
+    val allowPrefixes = matchingResources(
+      principal, host, op, AclPermissionType.ALLOW, resourceType, 
PatternType.PREFIXED)
+    val denyPrefixes = matchingResources(
+      principal, host, op, AclPermissionType.DENY, resourceType, 
PatternType.PREFIXED)

Review comment:
       In the typical case, we have a large number of `allowLiterals` and 
`allowPrefixes`, no `denyLiterals` or `denPrefixes`. I think it would make 
sense to special case `denyLiterals.isEmpty && denyPrefixes.isEmpty`. In this 
case, we don't need to find all matching resources, we just need to check that 
there is at least one matching resource. 

##########
File path: core/src/main/scala/kafka/security/authorizer/AclAuthorizer.scala
##########
@@ -304,6 +308,105 @@ class AclAuthorizer extends Authorizer with Logging {
     if (zkClient != null) zkClient.close()
   }
 
+  // TODO: 1. Discuss how to log audit message
+  // TODO: 2. Discuss if we need a trie to optimize(mainly for the O(n^2) loop 
but I think
+  //  in most of the cases it would be O(1) because denyDominatePrefixAllow 
should be rare
+  override def authorizeByResourceType(requestContext: 
AuthorizableRequestContext,
+                                       op: AclOperation,
+                                       resourceType: ResourceType): 
AuthorizationResult = {
+    SecurityUtils.authorizeByResourceTypeCheckArgs(op, resourceType)
+
+    val principal = new KafkaPrincipal(
+      requestContext.principal().getPrincipalType,
+      requestContext.principal().getName).toString
+    val host = requestContext.clientAddress().getHostAddress
+    val action = new Action(op, new ResourcePattern(resourceType, "NONE", 
PatternType.UNKNOWN), 0, true, true)
+
+    val denyLiterals = matchingResources(
+      principal, host, op, AclPermissionType.DENY, resourceType, 
PatternType.LITERAL)
+
+    if (denyAll(denyLiterals)) {
+      logAuditMessage(requestContext, action, false)
+      return AuthorizationResult.DENIED
+    }
+
+    if (shouldAllowEveryoneIfNoAclIsFound) {
+      logAuditMessage(requestContext, action, true)
+      return AuthorizationResult.ALLOWED
+    }
+
+    val allowLiterals = matchingResources(
+      principal, host, op, AclPermissionType.ALLOW, resourceType, 
PatternType.LITERAL)
+    val allowPrefixes = matchingResources(
+      principal, host, op, AclPermissionType.ALLOW, resourceType, 
PatternType.PREFIXED)
+    val denyPrefixes = matchingResources(
+      principal, host, op, AclPermissionType.DENY, resourceType, 
PatternType.PREFIXED)
+
+    if (allowAny(allowLiterals, allowPrefixes, denyLiterals, denyPrefixes)) {
+      logAuditMessage(requestContext, action, true)
+      return AuthorizationResult.ALLOWED
+    }
+
+    logAuditMessage(requestContext, action, false)
+    AuthorizationResult.DENIED
+  }
+
+  def matchingResources(principal: String, host: String, op: AclOperation, 
permission: AclPermissionType,
+                        resourceType: ResourceType, patternType: PatternType): 
List[mutable.HashSet[String]] = {

Review comment:
       Why can't this be a `Set` instead of `List of Sets`?

##########
File path: clients/src/main/java/org/apache/kafka/common/acl/ResourceIndex.java
##########
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.common.acl;
+
+import org.apache.kafka.common.resource.PatternType;
+import org.apache.kafka.common.resource.ResourceType;
+
+import java.util.Objects;
+
+public class ResourceIndex {

Review comment:
       Perhaps ResourceAclEntry or something along those lines would be better 
than `ResourceIndex` since this class has no notion of index.

##########
File path: core/src/main/scala/kafka/security/authorizer/AclAuthorizer.scala
##########
@@ -304,6 +308,105 @@ class AclAuthorizer extends Authorizer with Logging {
     if (zkClient != null) zkClient.close()
   }
 
+  // TODO: 1. Discuss how to log audit message
+  // TODO: 2. Discuss if we need a trie to optimize(mainly for the O(n^2) loop 
but I think
+  //  in most of the cases it would be O(1) because denyDominatePrefixAllow 
should be rare
+  override def authorizeByResourceType(requestContext: 
AuthorizableRequestContext,
+                                       op: AclOperation,
+                                       resourceType: ResourceType): 
AuthorizationResult = {
+    SecurityUtils.authorizeByResourceTypeCheckArgs(op, resourceType)
+
+    val principal = new KafkaPrincipal(
+      requestContext.principal().getPrincipalType,
+      requestContext.principal().getName).toString
+    val host = requestContext.clientAddress().getHostAddress
+    val action = new Action(op, new ResourcePattern(resourceType, "NONE", 
PatternType.UNKNOWN), 0, true, true)
+
+    val denyLiterals = matchingResources(
+      principal, host, op, AclPermissionType.DENY, resourceType, 
PatternType.LITERAL)
+
+    if (denyAll(denyLiterals)) {
+      logAuditMessage(requestContext, action, false)
+      return AuthorizationResult.DENIED
+    }
+
+    if (shouldAllowEveryoneIfNoAclIsFound) {
+      logAuditMessage(requestContext, action, true)
+      return AuthorizationResult.ALLOWED
+    }
+
+    val allowLiterals = matchingResources(
+      principal, host, op, AclPermissionType.ALLOW, resourceType, 
PatternType.LITERAL)
+    val allowPrefixes = matchingResources(
+      principal, host, op, AclPermissionType.ALLOW, resourceType, 
PatternType.PREFIXED)
+    val denyPrefixes = matchingResources(
+      principal, host, op, AclPermissionType.DENY, resourceType, 
PatternType.PREFIXED)
+
+    if (allowAny(allowLiterals, allowPrefixes, denyLiterals, denyPrefixes)) {
+      logAuditMessage(requestContext, action, true)
+      return AuthorizationResult.ALLOWED
+    }
+
+    logAuditMessage(requestContext, action, false)
+    AuthorizationResult.DENIED
+  }
+
+  def matchingResources(principal: String, host: String, op: AclOperation, 
permission: AclPermissionType,
+                        resourceType: ResourceType, patternType: PatternType): 
List[mutable.HashSet[String]] = {
+    var matched = List[mutable.HashSet[String]]()
+    for (p <- Set(principal, AclEntry.WildcardPrincipal.toString)) {
+      for (h <- Set(host, AclEntry.WildcardHost)) {
+        for (o <- Set(op, AclOperation.ALL)) {
+          val ace = new AccessControlEntry(p, h, o, permission)
+          val resourceIndex = new ResourceIndex(ace, resourceType, patternType)
+          resourceCache.get(resourceIndex) match {
+            case Some(resources) => matched = matched :+ resources
+            case None =>
+          }
+        }
+      }
+    }
+    matched
+  }
+
+  def denyAll(denyLiterals: List[mutable.HashSet[String]]): Boolean =

Review comment:
       private def?

##########
File path: core/src/main/scala/kafka/security/authorizer/AuthorizerWrapper.scala
##########
@@ -71,15 +73,19 @@ object AuthorizerWrapper {
   }
 
   def convertToResource(resourcePattern: ResourcePattern): Resource = {
-    Resource(ResourceType.fromJava(resourcePattern.resourceType), 
resourcePattern.name, resourcePattern.patternType)
+    Resource(ResourceTypeLegacy.fromJava(resourcePattern.resourceType), 
resourcePattern.name, resourcePattern.patternType)
   }
 }
 
 @deprecated("Use kafka.security.authorizer.AclAuthorizer", "Since 2.5")
 class AuthorizerWrapper(private[kafka] val baseAuthorizer: 
kafka.security.auth.Authorizer) extends Authorizer {
 
+  var shouldAllowEveryoneIfNoAclIsFound = false
+
   override def configure(configs: util.Map[String, _]): Unit = {
     baseAuthorizer.configure(configs)
+    shouldAllowEveryoneIfNoAclIsFound = configs.asScala.get(
+      
AclAuthorizer.AllowEveryoneIfNoAclIsFoundProp).exists(_.toString.toBoolean)

Review comment:
       We cannot do this here. `AuthorizerWrapper` is used to wrap any custom 
authorizer using the old Authorizer API. `AllowEveryoneIfNoAclIsFoundProp` is a 
custom config of `SimpleAclAuthorizer` and `AclAuthorizer`, we cannot use that 
with any custom authorizer. We should find a way to support the config for 
SimpleAclAuthorizer that doesn't impact other custom authorizers.

##########
File path: core/src/main/scala/kafka/security/authorizer/AclAuthorizer.scala
##########
@@ -304,6 +308,105 @@ class AclAuthorizer extends Authorizer with Logging {
     if (zkClient != null) zkClient.close()
   }
 
+  // TODO: 1. Discuss how to log audit message
+  // TODO: 2. Discuss if we need a trie to optimize(mainly for the O(n^2) loop 
but I think
+  //  in most of the cases it would be O(1) because denyDominatePrefixAllow 
should be rare
+  override def authorizeByResourceType(requestContext: 
AuthorizableRequestContext,
+                                       op: AclOperation,
+                                       resourceType: ResourceType): 
AuthorizationResult = {
+    SecurityUtils.authorizeByResourceTypeCheckArgs(op, resourceType)
+
+    val principal = new KafkaPrincipal(
+      requestContext.principal().getPrincipalType,
+      requestContext.principal().getName).toString
+    val host = requestContext.clientAddress().getHostAddress
+    val action = new Action(op, new ResourcePattern(resourceType, "NONE", 
PatternType.UNKNOWN), 0, true, true)
+
+    val denyLiterals = matchingResources(
+      principal, host, op, AclPermissionType.DENY, resourceType, 
PatternType.LITERAL)
+
+    if (denyAll(denyLiterals)) {
+      logAuditMessage(requestContext, action, false)
+      return AuthorizationResult.DENIED
+    }
+
+    if (shouldAllowEveryoneIfNoAclIsFound) {
+      logAuditMessage(requestContext, action, true)
+      return AuthorizationResult.ALLOWED
+    }
+
+    val allowLiterals = matchingResources(
+      principal, host, op, AclPermissionType.ALLOW, resourceType, 
PatternType.LITERAL)
+    val allowPrefixes = matchingResources(
+      principal, host, op, AclPermissionType.ALLOW, resourceType, 
PatternType.PREFIXED)
+    val denyPrefixes = matchingResources(
+      principal, host, op, AclPermissionType.DENY, resourceType, 
PatternType.PREFIXED)
+
+    if (allowAny(allowLiterals, allowPrefixes, denyLiterals, denyPrefixes)) {
+      logAuditMessage(requestContext, action, true)
+      return AuthorizationResult.ALLOWED
+    }
+
+    logAuditMessage(requestContext, action, false)
+    AuthorizationResult.DENIED
+  }
+
+  def matchingResources(principal: String, host: String, op: AclOperation, 
permission: AclPermissionType,
+                        resourceType: ResourceType, patternType: PatternType): 
List[mutable.HashSet[String]] = {
+    var matched = List[mutable.HashSet[String]]()
+    for (p <- Set(principal, AclEntry.WildcardPrincipal.toString)) {
+      for (h <- Set(host, AclEntry.WildcardHost)) {
+        for (o <- Set(op, AclOperation.ALL)) {
+          val ace = new AccessControlEntry(p, h, o, permission)
+          val resourceIndex = new ResourceIndex(ace, resourceType, patternType)
+          resourceCache.get(resourceIndex) match {
+            case Some(resources) => matched = matched :+ resources
+            case None =>
+          }
+        }
+      }
+    }
+    matched
+  }
+
+  def denyAll(denyLiterals: List[mutable.HashSet[String]]): Boolean =
+    denyLiterals.exists(r => r.contains(ResourcePattern.WILDCARD_RESOURCE))
+
+
+  private def allowAny(allowLiterals: List[mutable.Set[String]], 
allowPrefixes: List[mutable.Set[String]],
+                       denyLiterals: List[mutable.Set[String]], denyPrefixes: 
List[mutable.Set[String]]): Boolean = {
+    (allowPrefixes.exists(prefixes =>
+          prefixes.exists(prefix => allowPrefix(prefix, denyPrefixes)))
+      || allowLiterals.exists(literals =>
+            literals.exists(literal => allowLiteral(literal, denyLiterals, 
denyPrefixes))))
+  }
+
+  private def allowLiteral(literalName: String,
+                           denyLiterals: List[mutable.Set[String]], 
denyPrefixes: List[mutable.Set[String]]): Boolean = {
+    literalName match{
+      case ResourcePattern.WILDCARD_RESOURCE => true
+      case _ => (denyLiterals.forall(denyLiterals => 
!denyLiterals.contains(literalName))
+                    && !hasDominantPrefixedDeny(literalName, denyPrefixes))
+    }
+  }
+
+  private def allowPrefix(prefixName: String,
+                          denyPrefixes: List[mutable.Set[String]]): Boolean = {
+    !hasDominantPrefixedDeny(prefixName, denyPrefixes)
+  }
+
+  private def hasDominantPrefixedDeny(resourceName: String, denyPrefixes: 
List[mutable.Set[String]]): Boolean = {

Review comment:
       This method can be in SecurityUtils and shared with the default 
authorizer?

##########
File path: core/src/main/scala/kafka/security/authorizer/AuthorizerWrapper.scala
##########
@@ -175,4 +181,32 @@ class AuthorizerWrapper(private[kafka] val baseAuthorizer: 
kafka.security.auth.A
   override def close(): Unit = {
     baseAuthorizer.close()
   }
+
+  override def authorizeByResourceType(requestContext: 
AuthorizableRequestContext,
+                                       op: AclOperation,
+                                       resourceType: ResourceType): 
AuthorizationResult = {
+    SecurityUtils.authorizeByResourceTypeCheckArgs(op, resourceType)
+
+    if (denyAllResource(requestContext, op, resourceType)) {
+      AuthorizationResult.DENIED
+    } else if (shouldAllowEveryoneIfNoAclIsFound) {
+      AuthorizationResult.ALLOWED
+    } else {
+      super.authorizeByResourceType(requestContext, op, resourceType)
+    }
+  }
+
+  private def denyAllResource(requestContext: AuthorizableRequestContext,
+                      op: AclOperation,
+                      resourceType: ResourceType): Boolean = {

Review comment:
       The main logic of this could potentially be moved to SecurityUtils since 
the default Authorizer implementation, AclAuthorizer and the wrapper all do 
this.

##########
File path: core/src/main/scala/kafka/security/authorizer/AuthorizerWrapper.scala
##########
@@ -175,4 +181,32 @@ class AuthorizerWrapper(private[kafka] val baseAuthorizer: 
kafka.security.auth.A
   override def close(): Unit = {
     baseAuthorizer.close()
   }
+
+  override def authorizeByResourceType(requestContext: 
AuthorizableRequestContext,
+                                       op: AclOperation,
+                                       resourceType: ResourceType): 
AuthorizationResult = {
+    SecurityUtils.authorizeByResourceTypeCheckArgs(op, resourceType)
+
+    if (denyAllResource(requestContext, op, resourceType)) {
+      AuthorizationResult.DENIED
+    } else if (shouldAllowEveryoneIfNoAclIsFound) {
+      AuthorizationResult.ALLOWED
+    } else {
+      super.authorizeByResourceType(requestContext, op, resourceType)
+    }
+  }
+
+  private def denyAllResource(requestContext: AuthorizableRequestContext,
+                      op: AclOperation,
+                      resourceType: ResourceType): Boolean = {
+    val resourceTypeFilter = new ResourcePatternFilter(
+      resourceType, null, PatternType.ANY)
+    val principal = new 
KafkaPrincipal(requestContext.principal.getPrincipalType, 
requestContext.principal.getName)
+    val accessControlEntry = new AccessControlEntryFilter(
+      principal.toString, requestContext.clientAddress().getHostAddress, op, 
AclPermissionType.DENY)

Review comment:
       Does this work with an ACL with wildcard host?

##########
File path: 
jmh-benchmarks/src/main/java/org/apache/kafka/jmh/acl/AclAuthorizerBenchmark.java
##########
@@ -164,4 +206,28 @@ public void testAclsIterator() {
     public void testAuthorizer() {
         aclAuthorizer.authorize(context, actions);
     }
+
+    @Benchmark
+    public void testAuthorizeByResourceType() {
+        aclAuthorizer.authorizeByResourceType(context, AclOperation.WRITE, 
ResourceType.TOPIC);
+    }
+
+    @Benchmark
+    public void testUpdateCache() {
+        AclAuthorizer aclAuthorizer = new AclAuthorizer();
+        scala.collection.mutable.Set<AclEntry> entries = new 
scala.collection.mutable.HashSet<>();
+        for (int i = 0; i < resourceCount; i ++){
+            scala.collection.immutable.Set<AclEntry> immutable = new 
scala.collection.immutable.HashSet<>();
+            for (int j = 0; j < aclCount; j++) {
+                entries.add(new AclEntry(new AccessControlEntry(
+                    principal.toString(), "127.0.0" + j, AclOperation.WRITE, 
AclPermissionType.ALLOW)));
+                immutable = entries.toSet();
+            }
+            aclAuthorizer.updateCache(

Review comment:
       It makes sense to merge the benchmarks to trunk. Let's make sure it 
measures just updateCache.

##########
File path: 
core/src/test/scala/unit/kafka/security/authorizer/AclAuthorizerTest.scala
##########
@@ -100,8 +106,15 @@ class AclAuthorizerTest extends ZooKeeperTestHarness {
 
   @After
   override def tearDown(): Unit = {
-    aclAuthorizer.close()
-    aclAuthorizer2.close()
+    val authorizers = Seq(aclAuthorizer, aclAuthorizer2)
+    authorizers.foreach(a => {
+      a.acls(AclBindingFilter.ANY).forEach(bd => {
+        removeAcls(aclAuthorizer, Set(bd.entry), bd.pattern())

Review comment:
       Why do we need this in tearDown?

##########
File path: 
jmh-benchmarks/src/main/java/org/apache/kafka/jmh/acl/AclAuthorizerBenchmark.java
##########
@@ -105,49 +120,76 @@ private void setFieldValue(Object obj, String fieldName, 
Object value) throws Ex
         field.set(obj, value);
     }
 
-    private TreeMap<ResourcePattern, VersionedAcls> prepareAclCache() {
+    private void prepareAclCache() throws UnknownHostException {
         Map<ResourcePattern, Set<AclEntry>> aclEntries = new HashMap<>();
         for (int resourceId = 0; resourceId < resourceCount; resourceId++) {
             ResourcePattern resource = new ResourcePattern(
                 (resourceId % 10 == 0) ? ResourceType.GROUP : 
ResourceType.TOPIC,
-                resourceNamePrefix + resourceId,
+                resourceName(resourceNamePrefix),

Review comment:
       We should revert changes to existing benchmark because it hard to tell 
why these changes were made and what impact it has on the original benchmark. 

##########
File path: 
jmh-benchmarks/src/main/java/org/apache/kafka/jmh/acl/AclAuthorizerBenchmark.java
##########
@@ -69,33 +73,44 @@
 @BenchmarkMode(Mode.AverageTime)
 @OutputTimeUnit(TimeUnit.MILLISECONDS)
 public class AclAuthorizerBenchmark {
-    @Param({"10000", "50000", "200000"})
+    @Param({"200000"})
     private int resourceCount;
     //no. of. rules per resource
-    @Param({"10", "50"})
+    @Param({"50"})

Review comment:
       revert?

##########
File path: 
core/src/test/scala/unit/kafka/security/authorizer/AclAuthorizerTest.scala
##########
@@ -1040,19 +1117,24 @@ class AclAuthorizerTest extends ZooKeeperTestHarness {
       securityProtocol, ClientInformation.EMPTY, false)
   }
 
-  private def authorize(authorizer: AclAuthorizer, requestContext: 
RequestContext, operation: AclOperation, resource: ResourcePattern): Boolean = {
+  private def authorize(authorizer: Authorizer, requestContext: 
RequestContext, operation: AclOperation, resource: ResourcePattern): Boolean = {
     val action = new Action(operation, resource, 1, true, true)
     authorizer.authorize(requestContext, List(action).asJava).asScala.head == 
AuthorizationResult.ALLOWED
   }
 
-  private def addAcls(authorizer: AclAuthorizer, aces: 
Set[AccessControlEntry], resourcePattern: ResourcePattern): Unit = {
+  private def authorizeByResourceType(authorizer: Authorizer, requestContext: 
RequestContext, operation: AclOperation, resourceType: ResourceType) : Boolean 
= {
+    authorizer.authorizeByResourceType(requestContext, operation, 
resourceType) == AuthorizationResult.ALLOWED
+  }
+
+  private def addAcls(authorizer: Authorizer, aces: Set[AccessControlEntry], 
resourcePattern: ResourcePattern): Unit = {
     val bindings = aces.map { ace => new AclBinding(resourcePattern, ace) }
     authorizer.createAcls(requestContext, bindings.toList.asJava).asScala
       .map(_.toCompletableFuture.get)
       .foreach { result => result.exception.ifPresent { e => throw e } }
+    aclAdded += Tuple3(authorizer, aces, resourcePattern)
   }
 
-  private def removeAcls(authorizer: AclAuthorizer, aces: 
Set[AccessControlEntry], resourcePattern: ResourcePattern): Boolean = {
+  private def removeAcls(authorizer: Authorizer, aces: 
Set[AccessControlEntry], resourcePattern: ResourcePattern): Boolean = {

Review comment:
       a lot of these changes look unnecessary

##########
File path: 
jmh-benchmarks/src/main/java/org/apache/kafka/jmh/acl/AclAuthorizerBenchmark.java
##########
@@ -69,33 +73,44 @@
 @BenchmarkMode(Mode.AverageTime)
 @OutputTimeUnit(TimeUnit.MILLISECONDS)
 public class AclAuthorizerBenchmark {
-    @Param({"10000", "50000", "200000"})
+    @Param({"200000"})

Review comment:
       looks like this hasn't been reverted?

##########
File path: 
jmh-benchmarks/src/main/java/org/apache/kafka/jmh/acl/AclAuthorizerBenchmark.java
##########
@@ -69,33 +73,44 @@
 @BenchmarkMode(Mode.AverageTime)
 @OutputTimeUnit(TimeUnit.MILLISECONDS)
 public class AclAuthorizerBenchmark {
-    @Param({"10000", "50000", "200000"})
+    @Param({"200000"})
     private int resourceCount;
     //no. of. rules per resource
-    @Param({"10", "50"})
+    @Param({"50"})
     private int aclCount;
 
+    @Param({"0", "20", "50", "90", "99", "99.9", "99.99", "100"})
+    private double denyPercentage;
+
     private final int hostPreCount = 1000;
     private final String resourceNamePrefix = "foo-bar35_resource-";
+    private final String resourceName = resourceNamePrefix + 95;
 
     private final AclAuthorizer aclAuthorizer = new AclAuthorizer();
     private final KafkaPrincipal principal = new 
KafkaPrincipal(KafkaPrincipal.USER_TYPE, "test-user");
     private List<Action> actions = new ArrayList<>();
     private RequestContext context;
 
+    private TreeMap<ResourcePattern, VersionedAcls> aclCache = new 
TreeMap<>(new AclAuthorizer.ResourceOrdering());
+    private scala.collection.mutable.HashMap<ResourceIndex, 
scala.collection.mutable.HashSet<String>> resourceCache =
+        new scala.collection.mutable.HashMap<>();
+    Random rand = new Random(System.currentTimeMillis());
+    double eps = 1e-9;
+
     @Setup(Level.Trial)
     public void setup() throws Exception {
-        setFieldValue(aclAuthorizer, 
AclAuthorizer.class.getDeclaredField("aclCache").getName(),
-            prepareAclCache());
+        prepareAclCache();
+        setFieldValue(aclAuthorizer, 
AclAuthorizer.class.getDeclaredField("aclCache").getName(), aclCache);
+        setFieldValue(aclAuthorizer, 
AclAuthorizer.class.getDeclaredField("resourceCache").getName(), resourceCache);
         // By adding `-95` to the resource name prefix, we cause the 
`TreeMap.from/to` call to return
         // most map entries. In such cases, we rely on the filtering based on 
`String.startsWith`
         // to return the matching ACLs. Using a more efficient data structure 
(e.g. a prefix
         // tree) should improve performance significantly).
         actions = Collections.singletonList(new Action(AclOperation.WRITE,
-            new ResourcePattern(ResourceType.TOPIC, resourceNamePrefix + 95, 
PatternType.LITERAL),
+            new ResourcePattern(ResourceType.TOPIC, resourceName, 
PatternType.LITERAL),

Review comment:
       why? This no longer reflects the comment above. Can we revert?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to