This is an automated email from the ASF dual-hosted git repository.

rsivaram pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new ae3a6ed  KAKFA-10619: Idempotent producer will get authorized once it 
has a WRITE access to at least one topic (KIP-679) (#9485)
ae3a6ed is described below

commit ae3a6ed990f91708686d27c6023bac050c422248
Author: Cheng Tan <[email protected]>
AuthorDate: Fri Dec 18 10:08:46 2020 -0800

    KAKFA-10619: Idempotent producer will get authorized once it has a WRITE 
access to at least one topic (KIP-679) (#9485)
    
    Includes:
    - New API to authorize by resource type
    - Default implementation for the method that supports super users and ACLs
    - Optimized implementation in AclAuthorizer that supports ACLs, super users 
and allow.everyone.if.no.acl.found
    - Benchmarks and tests
    - InitProducerIdRequest authorized for Cluster:IdempotentWrite or WRITE to 
any topic, ProduceRequest authorized only for topic even if idempotent
    
    Reviewers: Lucas Bradstreet <[email protected]>, Rajini Sivaram 
<[email protected]>
---
 checkstyle/suppressions.xml                        |   6 +-
 .../apache/kafka/common/utils/SecurityUtils.java   |  30 ++
 .../apache/kafka/server/authorizer/Authorizer.java | 155 ++++++++-
 .../kafka/security/authorizer/AclAuthorizer.scala  | 164 ++++++++-
 .../security/authorizer/AuthorizerWrapper.scala    |  57 +++-
 core/src/main/scala/kafka/server/KafkaApis.scala   |  16 +-
 .../kafka/api/AuthorizerIntegrationTest.scala      | 108 +++++-
 .../security/authorizer/AclAuthorizerTest.scala    |  68 ++--
 .../AuthorizerInterfaceDefaultTest.scala           |  94 ++++++
 .../authorizer/AuthorizerWrapperTest.scala         | 106 ++++++
 .../security/authorizer/BaseAuthorizerTest.scala   | 375 +++++++++++++++++++++
 .../kafka/jmh/acl/AclAuthorizerBenchmark.java      | 125 +++++--
 12 files changed, 1201 insertions(+), 103 deletions(-)

diff --git a/checkstyle/suppressions.xml b/checkstyle/suppressions.xml
index 7fb20ec..69df37d 100644
--- a/checkstyle/suppressions.xml
+++ b/checkstyle/suppressions.xml
@@ -67,13 +67,13 @@
               
files="(Utils|Topic|KafkaLZ4BlockOutputStream|AclData|JoinGroupRequest).java"/>
 
     <suppress checks="CyclomaticComplexity"
-              
files="(ConsumerCoordinator|Fetcher|Sender|KafkaProducer|BufferPool|ConfigDef|RecordAccumulator|KerberosLogin|AbstractRequest|AbstractResponse|Selector|SslFactory|SslTransportLayer|SaslClientAuthenticator|SaslClientCallbackHandler|SaslServerAuthenticator|AbstractCoordinator|TransactionManager|AbstractStickyAssignor|DefaultSslEngineFactory).java"/>
+              
files="(ConsumerCoordinator|Fetcher|Sender|KafkaProducer|BufferPool|ConfigDef|RecordAccumulator|KerberosLogin|AbstractRequest|AbstractResponse|Selector|SslFactory|SslTransportLayer|SaslClientAuthenticator|SaslClientCallbackHandler|SaslServerAuthenticator|AbstractCoordinator|TransactionManager|AbstractStickyAssignor|DefaultSslEngineFactory|Authorizer).java"/>
 
     <suppress checks="JavaNCSS"
               
files="(AbstractRequest|AbstractResponse|KerberosLogin|WorkerSinkTaskTest|TransactionManagerTest|SenderTest|KafkaAdminClient|ConsumerCoordinatorTest|KafkaAdminClientTest).java"/>
 
     <suppress checks="NPathComplexity"
-              
files="(ConsumerCoordinator|BufferPool|Fetcher|MetricName|Node|ConfigDef|RecordBatch|SslFactory|SslTransportLayer|MetadataResponse|KerberosLogin|Selector|Sender|Serdes|TokenInformation|Agent|Values|PluginUtils|MiniTrogdorCluster|TasksRequest|KafkaProducer|AbstractStickyAssignor|KafkaRaftClient).java"/>
+              
files="(ConsumerCoordinator|BufferPool|Fetcher|MetricName|Node|ConfigDef|RecordBatch|SslFactory|SslTransportLayer|MetadataResponse|KerberosLogin|Selector|Sender|Serdes|TokenInformation|Agent|Values|PluginUtils|MiniTrogdorCluster|TasksRequest|KafkaProducer|AbstractStickyAssignor|KafkaRaftClient|Authorizer).java"/>
 
     <suppress checks="(JavaNCSS|CyclomaticComplexity|MethodLength)"
               files="CoordinatorClient.java"/>
@@ -100,7 +100,7 @@
               
files="RequestResponseTest.java|FetcherTest.java|KafkaAdminClientTest.java"/>
 
     <suppress checks="NPathComplexity"
-              files="MemoryRecordsTest|MetricsTest|TestSslUtils"/>
+              
files="MemoryRecordsTest|MetricsTest|TestSslUtils|AclAuthorizerBenchmark"/>
 
     <suppress 
checks="(WhitespaceAround|LocalVariableName|ImportControl|AvoidStarImport)"
               files="Murmur3Test.java"/>
diff --git 
a/clients/src/main/java/org/apache/kafka/common/utils/SecurityUtils.java 
b/clients/src/main/java/org/apache/kafka/common/utils/SecurityUtils.java
index 12defdc..88a4cfc 100644
--- a/clients/src/main/java/org/apache/kafka/common/utils/SecurityUtils.java
+++ b/clients/src/main/java/org/apache/kafka/common/utils/SecurityUtils.java
@@ -19,6 +19,8 @@ package org.apache.kafka.common.utils;
 import org.apache.kafka.common.acl.AclOperation;
 import org.apache.kafka.common.acl.AclPermissionType;
 import org.apache.kafka.common.config.SecurityConfig;
+import org.apache.kafka.common.resource.PatternType;
+import org.apache.kafka.common.resource.ResourcePattern;
 import org.apache.kafka.common.resource.ResourceType;
 import org.apache.kafka.common.security.auth.SecurityProviderCreator;
 import org.apache.kafka.common.security.auth.KafkaPrincipal;
@@ -146,4 +148,32 @@ public class SecurityUtils {
         }
         return builder.toString();
     }
+
+    public static void authorizeByResourceTypeCheckArgs(AclOperation op,
+                                                        ResourceType type) {
+        if (type == ResourceType.ANY) {
+            throw new IllegalArgumentException(
+                "Must specify a non-filter resource type for 
authorizeByResourceType");
+        }
+
+        if (type == ResourceType.UNKNOWN) {
+            throw new IllegalArgumentException(
+                "Unknown resource type");
+        }
+
+        if (op == AclOperation.ANY) {
+            throw new IllegalArgumentException(
+                "Must specify a non-filter operation type for 
authorizeByResourceType");
+        }
+
+        if (op == AclOperation.UNKNOWN) {
+            throw new IllegalArgumentException(
+                "Unknown operation type");
+        }
+    }
+
+    public static boolean denyAll(ResourcePattern pattern) {
+        return pattern.patternType() == PatternType.LITERAL
+            && pattern.name().equals(ResourcePattern.WILDCARD_RESOURCE);
+    }
 }
diff --git 
a/clients/src/main/java/org/apache/kafka/server/authorizer/Authorizer.java 
b/clients/src/main/java/org/apache/kafka/server/authorizer/Authorizer.java
index 1865e7e..17348a7 100644
--- a/clients/src/main/java/org/apache/kafka/server/authorizer/Authorizer.java
+++ b/clients/src/main/java/org/apache/kafka/server/authorizer/Authorizer.java
@@ -17,16 +17,29 @@
 
 package org.apache.kafka.server.authorizer;
 
-import java.io.Closeable;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.CompletionStage;
-
 import org.apache.kafka.common.Configurable;
 import org.apache.kafka.common.Endpoint;
+import org.apache.kafka.common.acl.AccessControlEntryFilter;
 import org.apache.kafka.common.acl.AclBinding;
 import org.apache.kafka.common.acl.AclBindingFilter;
+import org.apache.kafka.common.acl.AclOperation;
+import org.apache.kafka.common.acl.AclPermissionType;
 import org.apache.kafka.common.annotation.InterfaceStability;
+import org.apache.kafka.common.resource.PatternType;
+import org.apache.kafka.common.resource.ResourcePattern;
+import org.apache.kafka.common.resource.ResourcePatternFilter;
+import org.apache.kafka.common.resource.ResourceType;
+import org.apache.kafka.common.security.auth.KafkaPrincipal;
+import org.apache.kafka.common.utils.SecurityUtils;
+
+import java.io.Closeable;
+import java.util.Collections;
+import java.util.EnumMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.CompletionStage;
 
 /**
  *
@@ -139,4 +152,136 @@ public interface Authorizer extends Configurable, 
Closeable {
      * @return Iterator for ACL bindings, which may be populated lazily.
      */
     Iterable<AclBinding> acls(AclBindingFilter filter);
+
+    /**
+     * Check if the caller is authorized to perform theĀ given ACL operation on 
at least one
+     * resource of the given type.
+     *
+     * Custom authorizer implementations should consider overriding this 
default implementation because:
+     * 1. The default implementation iterates all AclBindings multiple times, 
without any caching
+     *    by principal, host, operation, permission types, and resource types. 
More efficient
+     *    implementations may be added in custom authorizers that directly 
access cached entries.
+     * 2. The default implementation cannot integrate with any audit logging 
included in the
+     *    authorizer implementation.
+     * 3. The default implementation does not support any custom authorizer 
configs or other access
+     *    rules apart from ACLs.
+     *
+     * @param requestContext Request context including request resourceType, 
security protocol and listener name
+     * @param op             The ACL operation to check
+     * @param resourceType   The resource type to check
+     * @return               Return {@link AuthorizationResult#ALLOWED} if the 
caller is authorized
+     *                       to perform the given ACL operation on at least 
one resource of the
+     *                       given type. Return {@link 
AuthorizationResult#DENIED} otherwise.
+     */
+    default AuthorizationResult 
authorizeByResourceType(AuthorizableRequestContext requestContext, AclOperation 
op, ResourceType resourceType) {
+        SecurityUtils.authorizeByResourceTypeCheckArgs(op, resourceType);
+
+        // Check a hard-coded name to ensure that super users are granted
+        // access regardless of DENY ACLs.
+        if (authorize(requestContext, Collections.singletonList(new Action(
+                op, new ResourcePattern(resourceType, "hardcode", 
PatternType.LITERAL),
+                0, true, false)))
+                .get(0) == AuthorizationResult.ALLOWED) {
+            return AuthorizationResult.ALLOWED;
+        }
+
+        // Filter out all the resource pattern corresponding to the 
RequestContext,
+        // AclOperation, and ResourceType
+        ResourcePatternFilter resourceTypeFilter = new ResourcePatternFilter(
+            resourceType, null, PatternType.ANY);
+        AclBindingFilter aclFilter = new AclBindingFilter(
+            resourceTypeFilter, AccessControlEntryFilter.ANY);
+
+        EnumMap<PatternType, Set<String>> denyPatterns =
+            new EnumMap<PatternType, Set<String>>(PatternType.class) {{
+                put(PatternType.LITERAL, new HashSet<>());
+                put(PatternType.PREFIXED, new HashSet<>());
+            }};
+        EnumMap<PatternType, Set<String>> allowPatterns =
+            new EnumMap<PatternType, Set<String>>(PatternType.class) {{
+                put(PatternType.LITERAL, new HashSet<>());
+                put(PatternType.PREFIXED, new HashSet<>());
+            }};
+
+        boolean hasWildCardAllow = false;
+
+        KafkaPrincipal principal = new KafkaPrincipal(
+            requestContext.principal().getPrincipalType(),
+            requestContext.principal().getName());
+        String hostAddr = requestContext.clientAddress().getHostAddress();
+
+        for (AclBinding binding : acls(aclFilter)) {
+            if (!binding.entry().host().equals(hostAddr) && 
!binding.entry().host().equals("*"))
+                continue;
+
+            if 
(!SecurityUtils.parseKafkaPrincipal(binding.entry().principal()).equals(principal)
+                    && !binding.entry().principal().equals("User:*"))
+                continue;
+
+            if (binding.entry().operation() != op
+                    && binding.entry().operation() != AclOperation.ALL)
+                continue;
+
+            if (binding.entry().permissionType() == AclPermissionType.DENY) {
+                switch (binding.pattern().patternType()) {
+                    case LITERAL:
+                        // If wildcard deny exists, return deny directly
+                        if 
(binding.pattern().name().equals(ResourcePattern.WILDCARD_RESOURCE))
+                            return AuthorizationResult.DENIED;
+                        
denyPatterns.get(PatternType.LITERAL).add(binding.pattern().name());
+                        break;
+                    case PREFIXED:
+                        
denyPatterns.get(PatternType.PREFIXED).add(binding.pattern().name());
+                        break;
+                    default:
+                }
+                continue;
+            }
+
+            if (binding.entry().permissionType() != AclPermissionType.ALLOW)
+                continue;
+
+            switch (binding.pattern().patternType()) {
+                case LITERAL:
+                    if 
(binding.pattern().name().equals(ResourcePattern.WILDCARD_RESOURCE)) {
+                        hasWildCardAllow = true;
+                        continue;
+                    }
+                    
allowPatterns.get(PatternType.LITERAL).add(binding.pattern().name());
+                    break;
+                case PREFIXED:
+                    
allowPatterns.get(PatternType.PREFIXED).add(binding.pattern().name());
+                    break;
+                default:
+            }
+        }
+
+        if (hasWildCardAllow) {
+            return AuthorizationResult.ALLOWED;
+        }
+
+        // For any literal allowed, if there's no dominant literal and prefix 
denied, return allow.
+        // For any prefix allowed, if there's no dominant prefix denied, 
return allow.
+        for (Map.Entry<PatternType, Set<String>> entry : 
allowPatterns.entrySet()) {
+            for (String allowStr : entry.getValue()) {
+                if (entry.getKey() == PatternType.LITERAL
+                        && 
denyPatterns.get(PatternType.LITERAL).contains(allowStr))
+                    continue;
+                StringBuilder sb = new StringBuilder();
+                boolean hasDominatedDeny = false;
+                for (char ch : allowStr.toCharArray()) {
+                    sb.append(ch);
+                    if 
(denyPatterns.get(PatternType.PREFIXED).contains(sb.toString())) {
+                        hasDominatedDeny = true;
+                        break;
+                    }
+                }
+                if (!hasDominatedDeny)
+                    return AuthorizationResult.ALLOWED;
+            }
+        }
+
+        return AuthorizationResult.DENIED;
+    }
+
 }
diff --git a/core/src/main/scala/kafka/security/authorizer/AclAuthorizer.scala 
b/core/src/main/scala/kafka/security/authorizer/AclAuthorizer.scala
index 0a60e51..475baef 100644
--- a/core/src/main/scala/kafka/security/authorizer/AclAuthorizer.scala
+++ b/core/src/main/scala/kafka/security/authorizer/AclAuthorizer.scala
@@ -42,7 +42,7 @@ import org.apache.zookeeper.client.ZKClientConfig
 
 import scala.annotation.nowarn
 import scala.collection.mutable.ArrayBuffer
-import scala.collection.{Seq, mutable}
+import scala.collection.{Seq, immutable, mutable}
 import scala.jdk.CollectionConverters._
 import scala.util.{Failure, Random, Success, Try}
 
@@ -130,6 +130,11 @@ class AclAuthorizer extends Authorizer with Logging {
 
   @volatile
   private var aclCache = new 
scala.collection.immutable.TreeMap[ResourcePattern, VersionedAcls]()(new 
ResourceOrdering)
+
+  @volatile
+  private var resourceCache = new 
scala.collection.immutable.HashMap[ResourceTypeKey,
+    scala.collection.immutable.HashSet[String]]()
+
   private val lock = new Object()
 
   // The maximum number of times we should try to update the resource acls in 
zookeeper before failing;
@@ -304,6 +309,130 @@ class AclAuthorizer extends Authorizer with Logging {
     if (zkClient != null) zkClient.close()
   }
 
+  override def authorizeByResourceType(requestContext: 
AuthorizableRequestContext,
+                                       op: AclOperation,
+                                       resourceType: ResourceType): 
AuthorizationResult = {
+    SecurityUtils.authorizeByResourceTypeCheckArgs(op, resourceType)
+
+    val principal = new KafkaPrincipal(
+      requestContext.principal().getPrincipalType,
+      requestContext.principal().getName)
+
+    if (isSuperUser(principal))
+      return AuthorizationResult.ALLOWED
+
+    val resourceSnapshot = resourceCache
+    val principalStr = principal.toString
+    val host = requestContext.clientAddress().getHostAddress
+    val action = new Action(op, new ResourcePattern(resourceType, "NONE", 
PatternType.UNKNOWN), 0, true, true)
+
+    val denyLiterals = matchingResources(
+      resourceSnapshot, principalStr, host, op, AclPermissionType.DENY, 
resourceType, PatternType.LITERAL)
+
+    if (denyAll(denyLiterals)) {
+      logAuditMessage(requestContext, action, authorized = false)
+      return AuthorizationResult.DENIED
+    }
+
+    if (shouldAllowEveryoneIfNoAclIsFound) {
+      logAuditMessage(requestContext, action, authorized = true)
+      return AuthorizationResult.ALLOWED
+    }
+
+    val denyPrefixes = matchingResources(
+      resourceSnapshot, principalStr, host, op, AclPermissionType.DENY, 
resourceType, PatternType.PREFIXED)
+
+    if (denyLiterals.isEmpty && denyPrefixes.isEmpty) {
+      if (hasMatchingResources(resourceSnapshot, principalStr, host, op, 
AclPermissionType.ALLOW, resourceType, PatternType.PREFIXED)
+          || hasMatchingResources(resourceSnapshot, principalStr, host, op, 
AclPermissionType.ALLOW, resourceType, PatternType.LITERAL)) {
+        logAuditMessage(requestContext, action, authorized = true)
+        return AuthorizationResult.ALLOWED
+      } else {
+        logAuditMessage(requestContext, action, authorized = false)
+        return AuthorizationResult.DENIED
+      }
+    }
+
+    val allowLiterals = matchingResources(
+      resourceSnapshot, principalStr, host, op, AclPermissionType.ALLOW, 
resourceType, PatternType.LITERAL)
+    val allowPrefixes = matchingResources(
+      resourceSnapshot, principalStr, host, op, AclPermissionType.ALLOW, 
resourceType, PatternType.PREFIXED)
+
+    if (allowAny(allowLiterals, allowPrefixes, denyLiterals, denyPrefixes)) {
+      logAuditMessage(requestContext, action, authorized = true)
+      return AuthorizationResult.ALLOWED
+    }
+
+    logAuditMessage(requestContext, action, authorized = false)
+    AuthorizationResult.DENIED
+  }
+
+  private def matchingResources(resourceSnapshot: 
immutable.Map[ResourceTypeKey, immutable.Set[String]],
+                                principal: String, host: String, op: 
AclOperation, permission: AclPermissionType,
+                                resourceType: ResourceType, patternType: 
PatternType): ArrayBuffer[Set[String]] = {
+    val matched = ArrayBuffer[immutable.Set[String]]()
+    for (p <- Set(principal, AclEntry.WildcardPrincipalString);
+         h <- Set(host, AclEntry.WildcardHost);
+         o <- Set(op, AclOperation.ALL)) {
+      val resourceTypeKey = ResourceTypeKey(
+        new AccessControlEntry(p, h, o, permission), resourceType, patternType)
+      resourceSnapshot.get(resourceTypeKey) match {
+        case Some(resources) => matched += resources
+        case None =>
+      }
+    }
+    matched
+  }
+
+  private def hasMatchingResources(resourceSnapshot: 
immutable.Map[ResourceTypeKey, immutable.Set[String]],
+                                   principal: String, host: String, op: 
AclOperation, permission: AclPermissionType,
+                                   resourceType: ResourceType, patternType: 
PatternType): Boolean = {
+    for (p <- Set(principal, AclEntry.WildcardPrincipalString);
+         h <- Set(host, AclEntry.WildcardHost);
+         o <- Set(op, AclOperation.ALL)) {
+          val resourceTypeKey = ResourceTypeKey(
+            new AccessControlEntry(p, h, o, permission), resourceType, 
patternType)
+          if (resourceSnapshot.contains(resourceTypeKey))
+            return true
+    }
+    false
+  }
+
+  private def denyAll(denyLiterals: ArrayBuffer[immutable.Set[String]]): 
Boolean =
+    denyLiterals.exists(_.contains(ResourcePattern.WILDCARD_RESOURCE))
+
+
+  private def allowAny(allowLiterals: ArrayBuffer[immutable.Set[String]], 
allowPrefixes: ArrayBuffer[immutable.Set[String]],
+                       denyLiterals: ArrayBuffer[immutable.Set[String]], 
denyPrefixes: ArrayBuffer[immutable.Set[String]]): Boolean = {
+    (allowPrefixes.exists(_.exists(prefix => allowPrefix(prefix, 
denyPrefixes)))
+      || allowLiterals.exists(_.exists(literal => allowLiteral(literal, 
denyLiterals, denyPrefixes))))
+  }
+
+  private def allowLiteral(literalName: String, denyLiterals: 
ArrayBuffer[immutable.Set[String]],
+                           denyPrefixes: ArrayBuffer[immutable.Set[String]]): 
Boolean = {
+    literalName match {
+      case ResourcePattern.WILDCARD_RESOURCE => true
+      case _ => !denyLiterals.exists(_.contains(literalName)) && 
!hasDominantPrefixedDeny(literalName, denyPrefixes)
+    }
+  }
+
+  private def allowPrefix(prefixName: String,
+                          denyPrefixes: ArrayBuffer[immutable.Set[String]]): 
Boolean = {
+    !hasDominantPrefixedDeny(prefixName, denyPrefixes)
+  }
+
+  private def hasDominantPrefixedDeny(resourceName: String, denyPrefixes: 
ArrayBuffer[immutable.Set[String]]): Boolean = {
+    val sb = new StringBuilder
+    for (ch <- resourceName.toCharArray) {
+      sb.append(ch)
+      if (denyPrefixes.exists(p => p.contains(sb.toString()))) {
+        return true
+      }
+    }
+    false
+  }
+
+
   private def authorizeAction(requestContext: AuthorizableRequestContext, 
action: Action): AuthorizationResult = {
     val resource = action.resourcePattern
     if (resource.patternType != PatternType.LITERAL) {
@@ -547,7 +676,34 @@ class AclAuthorizer extends Authorizer with Logging {
     zkClient.getVersionedAclsForResource(resource)
   }
 
-  private def updateCache(resource: ResourcePattern, versionedAcls: 
VersionedAcls): Unit = {
+  // Visible for benchmark
+  def updateCache(resource: ResourcePattern, versionedAcls: VersionedAcls): 
Unit = {
+    val currentAces: Set[AccessControlEntry] = 
aclCache.get(resource).map(_.acls.map(_.ace)).getOrElse(Set.empty)
+    val newAces: Set[AccessControlEntry] = versionedAcls.acls.map(aclEntry => 
aclEntry.ace)
+    val acesToAdd = newAces.diff(currentAces)
+    val acesToRemove = currentAces.diff(newAces)
+
+    acesToAdd.foreach(ace => {
+      val resourceTypeKey = ResourceTypeKey(ace, resource.resourceType(), 
resource.patternType())
+      resourceCache.get(resourceTypeKey) match {
+        case Some(resources) => resourceCache += (resourceTypeKey -> 
(resources + resource.name()))
+        case None => resourceCache += (resourceTypeKey -> 
immutable.HashSet(resource.name()))
+      }
+    })
+    acesToRemove.foreach(ace => {
+      val resourceTypeKey = ResourceTypeKey(ace, resource.resourceType(), 
resource.patternType())
+      resourceCache.get(resourceTypeKey) match {
+        case Some(resources) =>
+          val newResources = resources - resource.name()
+          if (newResources.isEmpty) {
+            resourceCache -= resourceTypeKey
+          } else {
+            resourceCache += (resourceTypeKey -> newResources)
+          }
+        case None =>
+      }
+    })
+
     if (versionedAcls.acls.nonEmpty) {
       aclCache = aclCache.updated(resource, versionedAcls)
     } else {
@@ -582,4 +738,8 @@ class AclAuthorizer extends Authorizer with Logging {
       processAclChangeNotification(resource)
     }
   }
+
+  private case class ResourceTypeKey(ace: AccessControlEntry,
+                                     resourceType: ResourceType,
+                                     patternType: PatternType)
 }
diff --git 
a/core/src/main/scala/kafka/security/authorizer/AuthorizerWrapper.scala 
b/core/src/main/scala/kafka/security/authorizer/AuthorizerWrapper.scala
index 9a8bf9d..cc25fce 100644
--- a/core/src/main/scala/kafka/security/authorizer/AuthorizerWrapper.scala
+++ b/core/src/main/scala/kafka/security/authorizer/AuthorizerWrapper.scala
@@ -21,21 +21,23 @@ import java.util.concurrent.{CompletableFuture, 
CompletionStage}
 import java.{lang, util}
 
 import kafka.network.RequestChannel.Session
-import kafka.security.auth.{Acl, Operation, PermissionType, Resource, 
ResourceType}
+import kafka.security.auth.{Acl, Operation, PermissionType, Resource, 
SimpleAclAuthorizer, ResourceType => ResourceTypeLegacy}
 import kafka.security.authorizer.AuthorizerWrapper._
 import org.apache.kafka.common.Endpoint
-import org.apache.kafka.common.acl.{AccessControlEntry, AclBinding, 
AclBindingFilter}
+import org.apache.kafka.common.acl._
 import org.apache.kafka.common.errors.{ApiException, InvalidRequestException}
 import org.apache.kafka.common.protocol.Errors
 import org.apache.kafka.common.requests.ApiError
-import org.apache.kafka.common.resource.ResourcePattern
+import org.apache.kafka.common.resource.{PatternType, ResourcePattern, 
ResourcePatternFilter, ResourceType}
+import org.apache.kafka.common.security.auth.KafkaPrincipal
+import org.apache.kafka.common.utils.SecurityUtils
 import org.apache.kafka.common.utils.SecurityUtils.parseKafkaPrincipal
 import 
org.apache.kafka.server.authorizer.AclDeleteResult.AclBindingDeleteResult
 import org.apache.kafka.server.authorizer.{AuthorizableRequestContext, 
AuthorizerServerInfo, _}
 
-import scala.jdk.CollectionConverters._
 import scala.collection.mutable.ArrayBuffer
 import scala.collection.{Seq, immutable, mutable}
+import scala.jdk.CollectionConverters._
 import scala.util.{Failure, Success, Try}
 
 @deprecated("Use kafka.security.authorizer.AclAuthorizer", "Since 2.5")
@@ -43,7 +45,7 @@ object AuthorizerWrapper {
 
   def convertToResourceAndAcl(filter: AclBindingFilter): Either[ApiError, 
(Resource, Acl)] = {
     (for {
-      resourceType <- 
Try(ResourceType.fromJava(filter.patternFilter.resourceType))
+      resourceType <- 
Try(ResourceTypeLegacy.fromJava(filter.patternFilter.resourceType))
       principal <- Try(parseKafkaPrincipal(filter.entryFilter.principal))
       operation <- Try(Operation.fromJava(filter.entryFilter.operation))
       permissionType <- 
Try(PermissionType.fromJava(filter.entryFilter.permissionType))
@@ -71,15 +73,20 @@ object AuthorizerWrapper {
   }
 
   def convertToResource(resourcePattern: ResourcePattern): Resource = {
-    Resource(ResourceType.fromJava(resourcePattern.resourceType), 
resourcePattern.name, resourcePattern.patternType)
+    Resource(ResourceTypeLegacy.fromJava(resourcePattern.resourceType), 
resourcePattern.name, resourcePattern.patternType)
   }
 }
 
 @deprecated("Use kafka.security.authorizer.AclAuthorizer", "Since 2.5")
 class AuthorizerWrapper(private[kafka] val baseAuthorizer: 
kafka.security.auth.Authorizer) extends Authorizer {
 
+  var shouldAllowEveryoneIfNoAclIsFound = false
+
   override def configure(configs: util.Map[String, _]): Unit = {
     baseAuthorizer.configure(configs)
+    shouldAllowEveryoneIfNoAclIsFound = (configs.asScala.get(
+        
AclAuthorizer.AllowEveryoneIfNoAclIsFoundProp).exists(_.toString.toBoolean)
+      && baseAuthorizer.isInstanceOf[SimpleAclAuthorizer])
   }
 
   override def start(serverInfo: AuthorizerServerInfo): util.Map[Endpoint, _ 
<: CompletionStage[Void]] = {
@@ -175,4 +182,42 @@ class AuthorizerWrapper(private[kafka] val baseAuthorizer: 
kafka.security.auth.A
   override def close(): Unit = {
     baseAuthorizer.close()
   }
+
+  override def authorizeByResourceType(requestContext: 
AuthorizableRequestContext,
+                                       op: AclOperation,
+                                       resourceType: ResourceType): 
AuthorizationResult = {
+    SecurityUtils.authorizeByResourceTypeCheckArgs(op, resourceType)
+
+    if (super.authorizeByResourceType(requestContext, op, resourceType) == 
AuthorizationResult.ALLOWED)
+      AuthorizationResult.ALLOWED
+    else if (denyAllResource(requestContext, op, resourceType) || 
!shouldAllowEveryoneIfNoAclIsFound)
+      AuthorizationResult.DENIED
+    else
+      AuthorizationResult.ALLOWED
+  }
+
+  private def denyAllResource(requestContext: AuthorizableRequestContext,
+                              op: AclOperation,
+                              resourceType: ResourceType): Boolean = {
+    val resourceTypeFilter = new ResourcePatternFilter(
+      resourceType, Resource.WildCardResource, PatternType.LITERAL)
+    val principal = new KafkaPrincipal(
+      requestContext.principal.getPrincipalType, 
requestContext.principal.getName).toString
+    val host = requestContext.clientAddress().getHostAddress
+    val entryFilter = new AccessControlEntryFilter(null, null, op, 
AclPermissionType.DENY)
+    val entryFilterAllOp = new AccessControlEntryFilter(null, null, 
AclOperation.ALL, AclPermissionType.DENY)
+    val aclFilter = new AclBindingFilter(resourceTypeFilter, entryFilter)
+    val aclFilterAllOp = new AclBindingFilter(resourceTypeFilter, 
entryFilterAllOp)
+
+    (acls(aclFilter).asScala.exists(b => principalHostMatch(b.entry(), 
principal, host))
+      || acls(aclFilterAllOp).asScala.exists(b => 
principalHostMatch(b.entry(), principal, host)))
+  }
+
+  private def principalHostMatch(ace: AccessControlEntry,
+                                 principal: String,
+                                 host: String): Boolean = {
+    ((ace.host() == AclEntry.WildcardHost || ace.host() == host)
+      && (ace.principal() == AclEntry.WildcardPrincipalString || 
ace.principal() == principal))
+  }
+
 }
diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala 
b/core/src/main/scala/kafka/server/KafkaApis.scala
index dcb098b..f4758a6 100644
--- a/core/src/main/scala/kafka/server/KafkaApis.scala
+++ b/core/src/main/scala/kafka/server/KafkaApis.scala
@@ -581,11 +581,6 @@ class KafkaApis(val requestChannel: RequestChannel,
         sendErrorResponseMaybeThrottle(request, 
Errors.TRANSACTIONAL_ID_AUTHORIZATION_FAILED.exception)
         return
       }
-      // Note that authorization to a transactionalId implies ProducerId 
authorization
-
-    } else if (hasIdempotentRecords && !authorize(request.context, 
IDEMPOTENT_WRITE, CLUSTER, CLUSTER_NAME)) {
-      sendErrorResponseMaybeThrottle(request, 
Errors.CLUSTER_AUTHORIZATION_FAILED.exception)
-      return
     }
 
     val unauthorizedTopicResponses = mutable.Map[TopicPartition, 
PartitionResponse]()
@@ -2128,7 +2123,8 @@ class KafkaApis(val requestChannel: RequestChannel,
         sendErrorResponseMaybeThrottle(request, 
Errors.TRANSACTIONAL_ID_AUTHORIZATION_FAILED.exception)
         return
       }
-    } else if (!authorize(request.context, IDEMPOTENT_WRITE, CLUSTER, 
CLUSTER_NAME)) {
+    } else if (!authorize(request.context, IDEMPOTENT_WRITE, CLUSTER, 
CLUSTER_NAME, true, false)
+        && !authorizeByResourceType(request.context, AclOperation.WRITE, 
ResourceType.TOPIC)) {
       sendErrorResponseMaybeThrottle(request, 
Errors.CLUSTER_AUTHORIZATION_FAILED.exception)
       return
     }
@@ -3296,6 +3292,14 @@ class KafkaApis(val requestChannel: RequestChannel,
     }
   }
 
+  private def authorizeByResourceType(requestContext: RequestContext,
+                                      operation: AclOperation,
+                                      resourceType: ResourceType): Boolean = {
+    authorizer.forall { authZ =>
+      authZ.authorizeByResourceType(requestContext, operation, resourceType) 
== AuthorizationResult.ALLOWED
+    }
+  }
+
   // private package for testing
   private[server] def filterByAuthorized[T](requestContext: RequestContext,
                                             operation: AclOperation,
diff --git 
a/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala 
b/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
index e1ee87a..7dd12e4 100644
--- a/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
+++ b/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
@@ -29,7 +29,7 @@ import org.apache.kafka.clients.consumer._
 import 
org.apache.kafka.clients.consumer.internals.NoOpConsumerRebalanceListener
 import org.apache.kafka.clients.producer._
 import org.apache.kafka.common.acl.AclOperation._
-import org.apache.kafka.common.acl.AclPermissionType.ALLOW
+import org.apache.kafka.common.acl.AclPermissionType.{ALLOW, DENY}
 import org.apache.kafka.common.acl.{AccessControlEntry, 
AccessControlEntryFilter, AclBindingFilter, AclOperation, AclPermissionType}
 import org.apache.kafka.common.config.internals.BrokerSecurityConfigs
 import org.apache.kafka.common.config.{ConfigResource, LogLevelConfig}
@@ -52,7 +52,7 @@ import org.apache.kafka.common.network.ListenerName
 import org.apache.kafka.common.protocol.{ApiKeys, Errors}
 import org.apache.kafka.common.record.{CompressionType, MemoryRecords, 
RecordBatch, Records, SimpleRecord}
 import org.apache.kafka.common.requests._
-import org.apache.kafka.common.resource.PatternType.LITERAL
+import org.apache.kafka.common.resource.PatternType.{LITERAL, PREFIXED}
 import org.apache.kafka.common.resource.ResourceType._
 import org.apache.kafka.common.resource.{PatternType, Resource, 
ResourcePattern, ResourcePatternFilter, ResourceType}
 import org.apache.kafka.common.security.auth.{AuthenticationContext, 
KafkaPrincipal, KafkaPrincipalBuilder, SecurityProtocol}
@@ -1612,14 +1612,18 @@ class AuthorizerIntegrationTest extends BaseRequestTest 
{
   @Test
   def testIdempotentProducerNoIdempotentWriteAclInInitProducerId(): Unit = {
     createTopic(topic)
+    addAndVerifyAcls(Set(new AccessControlEntry(clientPrincipalString, 
WildcardHost, READ, ALLOW)), topicResource)
+    shouldIdempotentProducerFailInInitProducerId(true)
+  }
 
-    addAndVerifyAcls(Set(new AccessControlEntry(clientPrincipalString, 
WildcardHost, WRITE, ALLOW)), topicResource)
+  def shouldIdempotentProducerFailInInitProducerId(expectAuthException: 
Boolean): Unit = {
     val producer = buildIdempotentProducer()
     try {
       // the InitProducerId is sent asynchronously, so we expect the error 
either in the callback
       // or raised from send itself
       producer.send(new ProducerRecord[Array[Byte], Array[Byte]](topic, 
"hi".getBytes)).get()
-      fail("Should have raised ClusterAuthorizationException")
+      if (expectAuthException)
+        fail("Should have raised ClusterAuthorizationException")
     } catch {
       case e: ExecutionException =>
         assertTrue(e.getCause.isInstanceOf[ClusterAuthorizationException])
@@ -1628,7 +1632,8 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
       // the second time, the call to send itself should fail (the producer 
becomes unusable
       // if no producerId can be obtained)
       producer.send(new ProducerRecord[Array[Byte], Array[Byte]](topic, 
"hi".getBytes)).get()
-      fail("Should have raised ClusterAuthorizationException")
+      if (expectAuthException)
+        fail("Should have raised ClusterAuthorizationException")
     } catch {
       case e: ExecutionException =>
         assertTrue(e.getCause.isInstanceOf[ClusterAuthorizationException])
@@ -1638,18 +1643,20 @@ class AuthorizerIntegrationTest extends BaseRequestTest 
{
   @Test
   def testIdempotentProducerNoIdempotentWriteAclInProduce(): Unit = {
     createTopic(topic)
-
     addAndVerifyAcls(Set(new AccessControlEntry(clientPrincipalString, 
WildcardHost, WRITE, ALLOW)), topicResource)
     addAndVerifyAcls(Set(new AccessControlEntry(clientPrincipalString, 
WildcardHost, IDEMPOTENT_WRITE, ALLOW)), clusterResource)
+    idempotentProducerShouldFailInProduce(() => removeAllClientAcls())
+  }
 
+  def idempotentProducerShouldFailInProduce(removeAclIdempotenceRequired: () 
=> Unit): Unit = {
     val producer = buildIdempotentProducer()
 
     // first send should be fine since we have permission to get a ProducerId
     producer.send(new ProducerRecord[Array[Byte], Array[Byte]](topic, 
"hi".getBytes)).get()
 
     // revoke the IdempotentWrite permission
-    removeAllClientAcls()
-    addAndVerifyAcls(Set(new AccessControlEntry(clientPrincipalString, 
WildcardHost, WRITE, ALLOW)), topicResource)
+    removeAclIdempotenceRequired()
+    addAndVerifyAcls(Set(new AccessControlEntry(clientPrincipalString, 
WildcardHost, DESCRIBE, ALLOW)), topicResource)
 
     try {
       // the send should now fail with a cluster auth error
@@ -1657,7 +1664,7 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
       fail("Should have raised ClusterAuthorizationException")
     } catch {
       case e: ExecutionException =>
-        assertTrue(e.getCause.isInstanceOf[ClusterAuthorizationException])
+        assertTrue(e.getCause.isInstanceOf[TopicAuthorizationException])
     }
     try {
       // the second time, the call to send itself should fail (the producer 
becomes unusable
@@ -1666,7 +1673,7 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
       fail("Should have raised ClusterAuthorizationException")
     } catch {
       case e: ExecutionException =>
-        assertTrue(e.getCause.isInstanceOf[ClusterAuthorizationException])
+        assertTrue(e.getCause.isInstanceOf[TopicAuthorizationException])
     }
   }
 
@@ -1792,6 +1799,87 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
     assertFalse("Cluster id not returned", response.clusterId.isEmpty)
   }
 
+  @Test
+  def testAuthorizeByResourceTypeMultipleAddAndRemove(): Unit = {
+    createTopic(topic)
+
+    for (_ <- 1 to 3) {
+      addAndVerifyAcls(Set(new AccessControlEntry(clientPrincipalString, 
WildcardHost, DESCRIBE, ALLOW)), topicResource)
+      shouldIdempotentProducerFailInInitProducerId(true)
+
+      addAndVerifyAcls(Set(new AccessControlEntry(clientPrincipalString, 
WildcardHost, WRITE, ALLOW)), topicResource)
+      shouldIdempotentProducerFailInInitProducerId(false)
+
+      removeAllClientAcls()
+      addAndVerifyAcls(Set(new AccessControlEntry(clientPrincipalString, 
WildcardHost, DESCRIBE, ALLOW)), topicResource)
+      shouldIdempotentProducerFailInInitProducerId(true)
+    }
+  }
+
+  @Test
+  def testAuthorizeByResourceTypeIsolationUnrelatedDenyWontDominateAllow(): 
Unit = {
+    createTopic(topic)
+    createTopic("topic-2")
+    createTopic("to")
+
+    val unrelatedPrincipalString = new 
KafkaPrincipal(KafkaPrincipal.USER_TYPE, "unrelated").toString
+    val unrelatedTopicResource = new ResourcePattern(TOPIC, "topic-2", LITERAL)
+    val unrelatedGroupResource = new ResourcePattern(GROUP, "to", PREFIXED)
+
+    val acl1 = new AccessControlEntry(clientPrincipalString, WildcardHost, 
READ, DENY)
+    val acl2 = new AccessControlEntry(unrelatedPrincipalString, WildcardHost, 
READ, DENY)
+    val acl3 = new AccessControlEntry(clientPrincipalString, WildcardHost, 
WRITE, DENY)
+    val acl4 = new AccessControlEntry(clientPrincipalString, WildcardHost, 
WRITE, ALLOW)
+    val acl5 = new AccessControlEntry(clientPrincipalString, WildcardHost, 
DESCRIBE, ALLOW)
+
+    addAndVerifyAcls(Set(acl1, acl4, acl5), topicResource)
+    addAndVerifyAcls(Set(acl2, acl3), unrelatedTopicResource)
+    addAndVerifyAcls(Set(acl2, acl3), unrelatedGroupResource)
+    shouldIdempotentProducerFailInInitProducerId(false)
+  }
+
+  @Test
+  def testAuthorizeByResourceTypeDenyTakesPrecedence(): Unit = {
+    createTopic(topic)
+    val allowWriteAce = new AccessControlEntry(clientPrincipalString, 
WildcardHost, WRITE, ALLOW)
+    addAndVerifyAcls(Set(allowWriteAce), topicResource)
+    shouldIdempotentProducerFailInInitProducerId(false)
+
+    val denyWriteAce = new AccessControlEntry(clientPrincipalString, 
WildcardHost, WRITE, DENY)
+    addAndVerifyAcls(Set(denyWriteAce), topicResource)
+    shouldIdempotentProducerFailInInitProducerId(true)
+  }
+
+  @Test
+  def testAuthorizeByResourceTypeWildcardResourceDenyDominate(): Unit = {
+    createTopic(topic)
+    val wildcard = new ResourcePattern(TOPIC, 
ResourcePattern.WILDCARD_RESOURCE, LITERAL)
+    val prefixed = new ResourcePattern(TOPIC, "t", PREFIXED)
+    val literal = new ResourcePattern(TOPIC, topic, LITERAL)
+    val allowWriteAce = new AccessControlEntry(clientPrincipalString, 
WildcardHost, WRITE, ALLOW)
+    val denyWriteAce = new AccessControlEntry(clientPrincipalString, 
WildcardHost, WRITE, DENY)
+
+    addAndVerifyAcls(Set(allowWriteAce), prefixed)
+    addAndVerifyAcls(Set(allowWriteAce), literal)
+    shouldIdempotentProducerFailInInitProducerId(false)
+
+    addAndVerifyAcls(Set(denyWriteAce), wildcard)
+    shouldIdempotentProducerFailInInitProducerId(true)
+  }
+
+  @Test
+  def testAuthorizeByResourceTypePrefixedResourceDenyDominate(): Unit = {
+    createTopic(topic)
+    val prefixed = new ResourcePattern(TOPIC, topic.substring(0, 1), PREFIXED)
+    val literal = new ResourcePattern(TOPIC, topic, LITERAL)
+    val allowWriteAce = new AccessControlEntry(clientPrincipalString, 
WildcardHost, WRITE, ALLOW)
+    val denyWriteAce = new AccessControlEntry(clientPrincipalString, 
WildcardHost, WRITE, DENY)
+
+    addAndVerifyAcls(Set(denyWriteAce), prefixed)
+    addAndVerifyAcls(Set(allowWriteAce), literal)
+    shouldIdempotentProducerFailInInitProducerId(true)
+  }
+
   def removeAllClientAcls(): Unit = {
     val authorizer = servers.head.dataPlaneRequestProcessor.authorizer.get
     val aclEntryFilter = new AccessControlEntryFilter(clientPrincipalString, 
null, AclOperation.ANY, AclPermissionType.ANY)
diff --git 
a/core/src/test/scala/unit/kafka/security/authorizer/AclAuthorizerTest.scala 
b/core/src/test/scala/unit/kafka/security/authorizer/AclAuthorizerTest.scala
index a5c57b6..dff935f 100644
--- a/core/src/test/scala/unit/kafka/security/authorizer/AclAuthorizerTest.scala
+++ b/core/src/test/scala/unit/kafka/security/authorizer/AclAuthorizerTest.scala
@@ -34,16 +34,13 @@ import org.apache.kafka.common.acl._
 import org.apache.kafka.common.acl.AclOperation._
 import org.apache.kafka.common.acl.AclPermissionType.{ALLOW, DENY}
 import org.apache.kafka.common.errors.{ApiException, 
UnsupportedVersionException}
-import org.apache.kafka.common.network.ClientInformation
-import org.apache.kafka.common.network.ListenerName
-import org.apache.kafka.common.protocol.ApiKeys
-import org.apache.kafka.common.requests.{RequestContext, RequestHeader}
+import org.apache.kafka.common.requests.RequestContext
 import org.apache.kafka.common.resource.{PatternType, ResourcePattern, 
ResourcePatternFilter, ResourceType}
 import org.apache.kafka.common.resource.Resource.CLUSTER_NAME
 import org.apache.kafka.common.resource.ResourcePattern.WILDCARD_RESOURCE
 import org.apache.kafka.common.resource.ResourceType._
 import org.apache.kafka.common.resource.PatternType.{LITERAL, MATCH, PREFIXED}
-import org.apache.kafka.common.security.auth.{KafkaPrincipal, SecurityProtocol}
+import org.apache.kafka.common.security.auth.KafkaPrincipal
 import org.apache.kafka.server.authorizer._
 import org.apache.kafka.common.utils.{Time, SecurityUtils => JSecurityUtils}
 import org.junit.Assert._
@@ -53,7 +50,7 @@ import org.scalatest.Assertions.intercept
 import scala.jdk.CollectionConverters._
 import scala.collection.mutable
 
-class AclAuthorizerTest extends ZooKeeperTestHarness {
+class AclAuthorizerTest extends ZooKeeperTestHarness with BaseAuthorizerTest {
 
   private val allowReadAcl = new AccessControlEntry(WildcardPrincipalString, 
WildcardHost, READ, ALLOW)
   private val allowWriteAcl = new AccessControlEntry(WildcardPrincipalString, 
WildcardHost, WRITE, ALLOW)
@@ -66,18 +63,13 @@ class AclAuthorizerTest extends ZooKeeperTestHarness {
 
   private val aclAuthorizer = new AclAuthorizer
   private val aclAuthorizer2 = new AclAuthorizer
-  private var resource: ResourcePattern = _
-  private val superUsers = "User:superuser1; User:superuser2"
-  private val username = "alice"
-  private val principal = new KafkaPrincipal(KafkaPrincipal.USER_TYPE, 
username)
-  private val requestContext = newRequestContext(principal, 
InetAddress.getByName("192.168.0.1"))
-  private var config: KafkaConfig = _
-  private var zooKeeperClient: ZooKeeperClient = _
 
   class CustomPrincipal(principalType: String, name: String) extends 
KafkaPrincipal(principalType, name) {
     override def equals(o: scala.Any): Boolean = false
   }
 
+  override def authorizer: Authorizer = aclAuthorizer
+
   @Before
   override def setUp(): Unit = {
     super.setUp()
@@ -988,6 +980,26 @@ class AclAuthorizerTest extends ZooKeeperTestHarness {
     }
   }
 
+  @Test
+  def testAuthorizeByResourceTypeNoAclFoundOverride(): Unit = {
+    val props = TestUtils.createBrokerConfig(1, zkConnect)
+    props.put(AclAuthorizer.AllowEveryoneIfNoAclIsFoundProp, "true")
+
+    val cfg = KafkaConfig.fromProps(props)
+    val aclAuthorizer = new AclAuthorizer
+    try {
+      aclAuthorizer.configure(cfg.originals)
+      assertTrue("If allow.everyone.if.no.acl.found = true, " +
+        "caller should have read access to at least one topic",
+        authorizeByResourceType(aclAuthorizer, requestContext, READ, 
resource.resourceType()))
+      assertTrue("If allow.everyone.if.no.acl.found = true, " +
+        "caller should have write access to at least one topic",
+        authorizeByResourceType(aclAuthorizer, requestContext, WRITE, 
resource.resourceType()))
+    } finally {
+      aclAuthorizer.close()
+    }
+  }
+
   private def givenAuthorizerWithProtocolVersion(protocolVersion: 
Option[ApiVersion]): Unit = {
     aclAuthorizer.close()
 
@@ -1033,41 +1045,11 @@ class AclAuthorizerTest extends ZooKeeperTestHarness {
     acls
   }
 
-  private def newRequestContext(principal: KafkaPrincipal, clientAddress: 
InetAddress, apiKey: ApiKeys = ApiKeys.PRODUCE): RequestContext = {
-    val securityProtocol = SecurityProtocol.SASL_PLAINTEXT
-    val header = new RequestHeader(apiKey, 2, "", 1) //ApiKeys apiKey, short 
version, String clientId, int correlation
-    new RequestContext(header, "", clientAddress, principal, 
ListenerName.forSecurityProtocol(securityProtocol),
-      securityProtocol, ClientInformation.EMPTY, false)
-  }
-
   private def authorize(authorizer: AclAuthorizer, requestContext: 
RequestContext, operation: AclOperation, resource: ResourcePattern): Boolean = {
     val action = new Action(operation, resource, 1, true, true)
     authorizer.authorize(requestContext, List(action).asJava).asScala.head == 
AuthorizationResult.ALLOWED
   }
 
-  private def addAcls(authorizer: AclAuthorizer, aces: 
Set[AccessControlEntry], resourcePattern: ResourcePattern): Unit = {
-    val bindings = aces.map { ace => new AclBinding(resourcePattern, ace) }
-    authorizer.createAcls(requestContext, bindings.toList.asJava).asScala
-      .map(_.toCompletableFuture.get)
-      .foreach { result => result.exception.ifPresent { e => throw e } }
-  }
-
-  private def removeAcls(authorizer: AclAuthorizer, aces: 
Set[AccessControlEntry], resourcePattern: ResourcePattern): Boolean = {
-    val bindings = if (aces.isEmpty)
-      Set(new AclBindingFilter(resourcePattern.toFilter, 
AccessControlEntryFilter.ANY) )
-    else
-      aces.map { ace => new AclBinding(resourcePattern, ace).toFilter }
-    authorizer.deleteAcls(requestContext, bindings.toList.asJava).asScala
-      .map(_.toCompletableFuture.get)
-      .forall { result =>
-        result.exception.ifPresent { e => throw e }
-        result.aclBindingDeleteResults.forEach { r =>
-          r.exception.ifPresent { e => throw e }
-        }
-        !result.aclBindingDeleteResults.isEmpty
-      }
-  }
-
   private def getAcls(authorizer: AclAuthorizer, resourcePattern: 
ResourcePattern): Set[AccessControlEntry] = {
     val acls = authorizer.acls(new AclBindingFilter(resourcePattern.toFilter, 
AccessControlEntryFilter.ANY)).asScala.toSet
     acls.map(_.entry)
diff --git 
a/core/src/test/scala/unit/kafka/security/authorizer/AuthorizerInterfaceDefaultTest.scala
 
b/core/src/test/scala/unit/kafka/security/authorizer/AuthorizerInterfaceDefaultTest.scala
new file mode 100644
index 0000000..11cb9c7
--- /dev/null
+++ 
b/core/src/test/scala/unit/kafka/security/authorizer/AuthorizerInterfaceDefaultTest.scala
@@ -0,0 +1,94 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package kafka.security.authorizer
+
+import java.util.concurrent.CompletionStage
+import java.{lang, util}
+
+import kafka.server.KafkaConfig
+import kafka.utils.TestUtils
+import kafka.zk.ZooKeeperTestHarness
+import kafka.zookeeper.ZooKeeperClient
+import org.apache.kafka.common.Endpoint
+import org.apache.kafka.common.acl._
+import org.apache.kafka.common.utils.Time
+import org.apache.kafka.server.authorizer._
+import org.junit.{After, Before}
+
+class AuthorizerInterfaceDefaultTest extends ZooKeeperTestHarness with 
BaseAuthorizerTest {
+
+  private val interfaceDefaultAuthorizer = new DelegateAuthorizer
+
+  override def authorizer: Authorizer = interfaceDefaultAuthorizer
+
+  @Before
+  override def setUp(): Unit = {
+    super.setUp()
+
+    // Increase maxUpdateRetries to avoid transient failures
+    interfaceDefaultAuthorizer.authorizer.maxUpdateRetries = Int.MaxValue
+
+    val props = TestUtils.createBrokerConfig(0, zkConnect)
+    props.put(AclAuthorizer.SuperUsersProp, superUsers)
+
+    config = KafkaConfig.fromProps(props)
+    interfaceDefaultAuthorizer.authorizer.configure(config.originals)
+
+    zooKeeperClient = new ZooKeeperClient(zkConnect, zkSessionTimeout, 
zkConnectionTimeout, zkMaxInFlightRequests,
+      Time.SYSTEM, "kafka.test", "AuthorizerInterfaceDefaultTest")
+  }
+
+  @After
+  override def tearDown(): Unit = {
+    interfaceDefaultAuthorizer.close()
+    zooKeeperClient.close()
+    super.tearDown()
+  }
+
+  class DelegateAuthorizer extends Authorizer {
+    val authorizer = new AclAuthorizer
+
+    override def start(serverInfo: AuthorizerServerInfo): util.Map[Endpoint, _ 
<: CompletionStage[Void]] = {
+      authorizer.start(serverInfo)
+    }
+
+    override def authorize(requestContext: AuthorizableRequestContext, 
actions: util.List[Action]): util.List[AuthorizationResult] = {
+      authorizer.authorize(requestContext, actions)
+    }
+
+    override def createAcls(requestContext: AuthorizableRequestContext, 
aclBindings: util.List[AclBinding]): util.List[_ <: 
CompletionStage[AclCreateResult]] = {
+      authorizer.createAcls(requestContext, aclBindings)
+    }
+
+    override def deleteAcls(requestContext: AuthorizableRequestContext, 
aclBindingFilters: util.List[AclBindingFilter]): util.List[_ <: 
CompletionStage[AclDeleteResult]] = {
+      authorizer.deleteAcls(requestContext, aclBindingFilters)
+    }
+
+    override def acls(filter: AclBindingFilter): lang.Iterable[AclBinding] = {
+      authorizer.acls(filter)
+    }
+
+    override def configure(configs: util.Map[String, _]): Unit = {
+      authorizer.configure(configs)
+    }
+
+    override def close(): Unit = {
+      authorizer.close()
+    }
+  }
+
+}
diff --git 
a/core/src/test/scala/unit/kafka/security/authorizer/AuthorizerWrapperTest.scala
 
b/core/src/test/scala/unit/kafka/security/authorizer/AuthorizerWrapperTest.scala
new file mode 100644
index 0000000..ec8ca08
--- /dev/null
+++ 
b/core/src/test/scala/unit/kafka/security/authorizer/AuthorizerWrapperTest.scala
@@ -0,0 +1,106 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package kafka.security.authorizer
+
+import java.util.UUID
+
+import kafka.security.auth.SimpleAclAuthorizer
+import kafka.server.KafkaConfig
+import kafka.utils.TestUtils
+import kafka.zk.ZooKeeperTestHarness
+import kafka.zookeeper.ZooKeeperClient
+import org.apache.kafka.common.acl.AclOperation._
+import org.apache.kafka.common.acl._
+import org.apache.kafka.common.resource.PatternType.LITERAL
+import org.apache.kafka.common.resource.ResourcePattern
+import org.apache.kafka.common.resource.ResourceType._
+import org.apache.kafka.common.utils.Time
+import org.apache.kafka.server.authorizer._
+import org.junit.Assert._
+import org.junit.{After, Before, Test}
+
+import scala.annotation.nowarn
+
+class AuthorizerWrapperTest extends ZooKeeperTestHarness with 
BaseAuthorizerTest {
+  @nowarn("cat=deprecation")
+  private val wrappedSimpleAuthorizer = new AuthorizerWrapper(new 
SimpleAclAuthorizer)
+  @nowarn("cat=deprecation")
+  private val wrappedSimpleAuthorizerAllowEveryone = new AuthorizerWrapper(new 
SimpleAclAuthorizer)
+
+  override def authorizer: Authorizer = wrappedSimpleAuthorizer
+
+  @Before
+  @nowarn("cat=deprecation")
+  override def setUp(): Unit = {
+    super.setUp()
+
+    val props = TestUtils.createBrokerConfig(0, zkConnect)
+
+    props.put(AclAuthorizer.SuperUsersProp, superUsers)
+    config = KafkaConfig.fromProps(props)
+    wrappedSimpleAuthorizer.configure(config.originals)
+
+    props.put(SimpleAclAuthorizer.AllowEveryoneIfNoAclIsFoundProp, "true")
+    config = KafkaConfig.fromProps(props)
+    wrappedSimpleAuthorizerAllowEveryone.configure(config.originals)
+
+    resource = new ResourcePattern(TOPIC, "foo-" + UUID.randomUUID(), LITERAL)
+    zooKeeperClient = new ZooKeeperClient(zkConnect, zkSessionTimeout, 
zkConnectionTimeout, zkMaxInFlightRequests,
+      Time.SYSTEM, "kafka.test", "AuthorizerWrapperTest")
+  }
+
+  @After
+  override def tearDown(): Unit = {
+    val authorizers = Seq(wrappedSimpleAuthorizer, 
wrappedSimpleAuthorizerAllowEveryone)
+    authorizers.foreach(a => {
+      a.close()
+    })
+    zooKeeperClient.close()
+    super.tearDown()
+  }
+
+  @Test
+  def testAuthorizeByResourceTypeEnableAllowEveryOne(): Unit = {
+    testAuthorizeByResourceTypeEnableAllowEveryOne(wrappedSimpleAuthorizer)
+  }
+
+  private def testAuthorizeByResourceTypeEnableAllowEveryOne(authorizer: 
Authorizer): Unit = {
+    assertTrue("If allow.everyone.if.no.acl.found = true, " +
+      "caller should have read access to at least one topic",
+      authorizeByResourceType(wrappedSimpleAuthorizerAllowEveryone, 
requestContext, READ, resource.resourceType()))
+    val allUser = AclEntry.WildcardPrincipalString
+    val allHost = AclEntry.WildcardHost
+    val denyAll = new AccessControlEntry(allUser, allHost, ALL, 
AclPermissionType.DENY)
+    val wildcardResource = new ResourcePattern(resource.resourceType(), 
AclEntry.WildcardResource, LITERAL)
+
+    addAcls(wrappedSimpleAuthorizerAllowEveryone, Set(denyAll), resource)
+    assertTrue("Should still allow since the deny only apply on the specific 
resource",
+      authorizeByResourceType(wrappedSimpleAuthorizerAllowEveryone, 
requestContext, READ, resource.resourceType()))
+
+    addAcls(wrappedSimpleAuthorizerAllowEveryone, Set(denyAll), 
wildcardResource)
+    assertFalse("When an ACL binding which can deny all users and hosts 
exists, " +
+      "even if allow.everyone.if.no.acl.found = true, caller shouldn't have 
read access any topic",
+      authorizeByResourceType(wrappedSimpleAuthorizerAllowEveryone, 
requestContext, READ, resource.resourceType()))
+  }
+
+  @Test
+  def testAuthorizeByResourceTypeDisableAllowEveryoneOverride(): Unit = {
+    assertFalse ("If allow.everyone.if.no.acl.found = false, " +
+      "caller shouldn't have read access to any topic",
+      authorizeByResourceType(wrappedSimpleAuthorizer, requestContext, READ, 
resource.resourceType()))
+  }
+}
diff --git 
a/core/src/test/scala/unit/kafka/security/authorizer/BaseAuthorizerTest.scala 
b/core/src/test/scala/unit/kafka/security/authorizer/BaseAuthorizerTest.scala
new file mode 100644
index 0000000..001fd24
--- /dev/null
+++ 
b/core/src/test/scala/unit/kafka/security/authorizer/BaseAuthorizerTest.scala
@@ -0,0 +1,375 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.security.authorizer
+
+import java.net.InetAddress
+import java.util.UUID
+
+import kafka.security.authorizer.AclEntry.{WildcardHost, 
WildcardPrincipalString}
+import kafka.server.KafkaConfig
+import kafka.zookeeper.ZooKeeperClient
+import org.apache.kafka.common.acl.AclOperation.{ALL, READ, WRITE}
+import org.apache.kafka.common.acl.AclPermissionType.{ALLOW, DENY}
+import org.apache.kafka.common.acl.{AccessControlEntry, 
AccessControlEntryFilter, AclBinding, AclBindingFilter, AclOperation}
+import org.apache.kafka.common.network.{ClientInformation, ListenerName}
+import org.apache.kafka.common.protocol.ApiKeys
+import org.apache.kafka.common.requests.{RequestContext, RequestHeader}
+import org.apache.kafka.common.resource.PatternType.{LITERAL, PREFIXED}
+import org.apache.kafka.common.resource.ResourcePattern.WILDCARD_RESOURCE
+import org.apache.kafka.common.resource.ResourceType.{CLUSTER, GROUP, TOPIC, 
TRANSACTIONAL_ID}
+import org.apache.kafka.common.resource.{ResourcePattern, ResourceType}
+import org.apache.kafka.common.security.auth.{KafkaPrincipal, SecurityProtocol}
+import org.apache.kafka.server.authorizer.{AuthorizationResult, Authorizer}
+import org.junit.Assert.{assertFalse, assertTrue}
+import org.junit.Test
+
+import scala.jdk.CollectionConverters._
+
+trait BaseAuthorizerTest {
+
+  def authorizer: Authorizer
+
+  val superUsers = "User:superuser1; User:superuser2"
+  val username = "alice"
+  val principal = new KafkaPrincipal(KafkaPrincipal.USER_TYPE, username)
+  val requestContext: RequestContext = newRequestContext(principal, 
InetAddress.getByName("192.168.0.1"))
+  val superUserName = "superuser1"
+  var config: KafkaConfig = _
+  var zooKeeperClient: ZooKeeperClient = _
+  var resource: ResourcePattern = _
+
+  @Test
+  def testAuthorizeByResourceTypeMultipleAddAndRemove(): Unit = {
+    val user1 = new KafkaPrincipal(KafkaPrincipal.USER_TYPE, "user1")
+    val host1 = InetAddress.getByName("192.168.1.1")
+    val resource1 = new ResourcePattern(TOPIC, "sb1" + UUID.randomUUID(), 
LITERAL)
+    val denyRead = new AccessControlEntry(user1.toString, 
host1.getHostAddress, READ, DENY)
+    val allowRead = new AccessControlEntry(user1.toString, 
host1.getHostAddress, READ, ALLOW)
+    val u1h1Context = newRequestContext(user1, host1)
+
+    for (_ <- 1 to 10) {
+      assertFalse("User1 from host1 should not have READ access to any topic 
when no ACL exists",
+        authorizeByResourceType(authorizer, u1h1Context, READ, 
ResourceType.TOPIC))
+
+      addAcls(authorizer, Set(allowRead), resource1)
+      assertTrue("User1 from host1 now should have READ access to at least one 
topic",
+        authorizeByResourceType(authorizer, u1h1Context, READ, 
ResourceType.TOPIC))
+
+      for (_ <- 1 to 10) {
+        addAcls(authorizer, Set(denyRead), resource1)
+        assertFalse("User1 from host1 now should not have READ access to any 
topic",
+          authorizeByResourceType(authorizer, u1h1Context, READ, 
ResourceType.TOPIC))
+
+        removeAcls(authorizer, Set(denyRead), resource1)
+        addAcls(authorizer, Set(allowRead), resource1)
+        assertTrue("User1 from host1 now should have READ access to at least 
one topic",
+          authorizeByResourceType(authorizer, u1h1Context, READ, 
ResourceType.TOPIC))
+      }
+
+      removeAcls(authorizer, Set(allowRead), resource1)
+      assertFalse("User1 from host1 now should not have READ access to any 
topic",
+        authorizeByResourceType(authorizer, u1h1Context, READ, 
ResourceType.TOPIC))
+    }
+  }
+
+  @Test
+  def testAuthorizeByResourceTypeIsolationUnrelatedDenyWontDominateAllow(): 
Unit = {
+    val user1 = new KafkaPrincipal(KafkaPrincipal.USER_TYPE, "user1")
+    val user2 = new KafkaPrincipal(KafkaPrincipal.USER_TYPE, "user2")
+    val host1 = InetAddress.getByName("192.168.1.1")
+    val host2 = InetAddress.getByName("192.168.1.2")
+    val resource1 = new ResourcePattern(TOPIC, "sb1" + UUID.randomUUID(), 
LITERAL)
+    val resource2 = new ResourcePattern(TOPIC, "sb2" + UUID.randomUUID(), 
LITERAL)
+    val resource3 = new ResourcePattern(GROUP, "s", PREFIXED)
+
+    val acl1 = new AccessControlEntry(user1.toString, host1.getHostAddress, 
READ, DENY)
+    val acl2 = new AccessControlEntry(user2.toString, host1.getHostAddress, 
READ, DENY)
+    val acl3 = new AccessControlEntry(user1.toString, host2.getHostAddress, 
WRITE, DENY)
+    val acl4 = new AccessControlEntry(user1.toString, host2.getHostAddress, 
READ, DENY)
+    val acl5 = new AccessControlEntry(user1.toString, host2.getHostAddress, 
READ, DENY)
+    val acl6 = new AccessControlEntry(user2.toString, host2.getHostAddress, 
READ, DENY)
+    val acl7 = new AccessControlEntry(user1.toString, host2.getHostAddress, 
READ, ALLOW)
+
+    addAcls(authorizer, Set(acl1, acl2, acl3, acl6, acl7), resource1)
+    addAcls(authorizer, Set(acl4), resource2)
+    addAcls(authorizer, Set(acl5), resource3)
+
+    val u1h1Context = newRequestContext(user1, host1)
+    val u1h2Context = newRequestContext(user1, host2)
+
+    assertFalse("User1 from host1 should not have READ access to any topic",
+      authorizeByResourceType(authorizer, u1h1Context, READ, 
ResourceType.TOPIC))
+    assertFalse("User1 from host2 should not have READ access to any consumer 
group",
+      authorizeByResourceType(authorizer, u1h1Context, READ, 
ResourceType.GROUP))
+    assertFalse("User1 from host2 should not have READ access to any topic",
+      authorizeByResourceType(authorizer, u1h1Context, READ, 
ResourceType.TRANSACTIONAL_ID))
+    assertFalse("User1 from host2 should not have READ access to any topic",
+      authorizeByResourceType(authorizer, u1h1Context, READ, 
ResourceType.CLUSTER))
+    assertTrue("User1 from host2 should have READ access to at least one 
topic",
+      authorizeByResourceType(authorizer, u1h2Context, READ, 
ResourceType.TOPIC))
+  }
+
+  @Test
+  def testAuthorizeByResourceTypeDenyTakesPrecedence(): Unit = {
+    val user1 = new KafkaPrincipal(KafkaPrincipal.USER_TYPE, "user1")
+    val host1 = InetAddress.getByName("192.168.1.1")
+    val resource1 = new ResourcePattern(TOPIC, "sb1" + UUID.randomUUID(), 
LITERAL)
+
+    val u1h1Context = newRequestContext(user1, host1)
+    val acl1 = new AccessControlEntry(user1.toString, host1.getHostAddress, 
WRITE, ALLOW)
+    val acl2 = new AccessControlEntry(user1.toString, host1.getHostAddress, 
WRITE, DENY)
+
+    addAcls(authorizer, Set(acl1), resource1)
+    assertTrue("User1 from host1 should have WRITE access to at least one 
topic",
+      authorizeByResourceType(authorizer, u1h1Context, WRITE, 
ResourceType.TOPIC))
+
+    addAcls(authorizer, Set(acl2), resource1)
+    assertFalse("User1 from host1 should not have WRITE access to any topic",
+      authorizeByResourceType(authorizer, u1h1Context, WRITE, 
ResourceType.TOPIC))
+  }
+
+  @Test
+  def testAuthorizeByResourceTypePrefixedResourceDenyDominate(): Unit = {
+    val user1 = new KafkaPrincipal(KafkaPrincipal.USER_TYPE, "user1")
+    val host1 = InetAddress.getByName("192.168.1.1")
+    val a = new ResourcePattern(GROUP, "a", PREFIXED)
+    val ab = new ResourcePattern(GROUP, "ab", PREFIXED)
+    val abc = new ResourcePattern(GROUP, "abc", PREFIXED)
+    val abcd = new ResourcePattern(GROUP, "abcd", PREFIXED)
+    val abcde = new ResourcePattern(GROUP, "abcde", PREFIXED)
+
+    val u1h1Context = newRequestContext(user1, host1)
+    val allowAce = new AccessControlEntry(user1.toString, 
host1.getHostAddress, READ, ALLOW)
+    val denyAce = new AccessControlEntry(user1.toString, host1.getHostAddress, 
READ, DENY)
+
+    addAcls(authorizer, Set(allowAce), abcde)
+    assertTrue("User1 from host1 should have READ access to at least one 
group",
+      authorizeByResourceType(authorizer, u1h1Context, READ, 
ResourceType.GROUP))
+
+    addAcls(authorizer, Set(denyAce), abcd)
+    assertFalse("User1 from host1 now should not have READ access to any 
group",
+      authorizeByResourceType(authorizer, u1h1Context, READ, 
ResourceType.GROUP))
+
+    addAcls(authorizer, Set(allowAce), abc)
+    assertTrue("User1 from host1 now should have READ access to any group",
+      authorizeByResourceType(authorizer, u1h1Context, READ, 
ResourceType.GROUP))
+
+    addAcls(authorizer, Set(denyAce), a)
+    assertFalse("User1 from host1 now should not have READ access to any 
group",
+      authorizeByResourceType(authorizer, u1h1Context, READ, 
ResourceType.GROUP))
+
+    addAcls(authorizer, Set(allowAce), ab)
+    assertFalse("User1 from host1 still should not have READ access to any 
group",
+      authorizeByResourceType(authorizer, u1h1Context, READ, 
ResourceType.GROUP))
+  }
+
+  @Test
+  def testAuthorizeByResourceTypeWildcardResourceDenyDominate(): Unit = {
+    val user1 = new KafkaPrincipal(KafkaPrincipal.USER_TYPE, "user1")
+    val host1 = InetAddress.getByName("192.168.1.1")
+    val wildcard = new ResourcePattern(GROUP, 
ResourcePattern.WILDCARD_RESOURCE, LITERAL)
+    val prefixed = new ResourcePattern(GROUP, "hello", PREFIXED)
+    val literal = new ResourcePattern(GROUP, "aloha", LITERAL)
+
+    val u1h1Context = newRequestContext(user1, host1)
+    val allowAce = new AccessControlEntry(user1.toString, 
host1.getHostAddress, WRITE, ALLOW)
+    val denyAce = new AccessControlEntry(user1.toString, host1.getHostAddress, 
WRITE, DENY)
+
+    addAcls(authorizer, Set(allowAce), prefixed)
+    assertTrue("User1 from host1 should have WRITE access to at least one 
group",
+      authorizeByResourceType(authorizer, u1h1Context, WRITE, 
ResourceType.GROUP))
+
+    addAcls(authorizer, Set(denyAce), wildcard)
+    assertFalse("User1 from host1 now should not have WRITE access to any 
group",
+      authorizeByResourceType(authorizer, u1h1Context, WRITE, 
ResourceType.GROUP))
+
+    addAcls(authorizer, Set(allowAce), wildcard)
+    assertFalse("User1 from host1 still should not have WRITE access to any 
group",
+      authorizeByResourceType(authorizer, u1h1Context, WRITE, 
ResourceType.GROUP))
+
+    addAcls(authorizer, Set(allowAce), literal)
+    assertFalse("User1 from host1 still should not have WRITE access to any 
group",
+      authorizeByResourceType(authorizer, u1h1Context, WRITE, 
ResourceType.GROUP))
+  }
+
+  @Test
+  def testAuthorizeByResourceTypeWithAllOperationAce(): Unit = {
+    val user1 = new KafkaPrincipal(KafkaPrincipal.USER_TYPE, "user1")
+    val host1 = InetAddress.getByName("192.168.1.1")
+    val resource1 = new ResourcePattern(TOPIC, "sb1" + UUID.randomUUID(), 
LITERAL)
+    val denyAll = new AccessControlEntry(user1.toString, host1.getHostAddress, 
ALL, DENY)
+    val allowAll = new AccessControlEntry(user1.toString, 
host1.getHostAddress, ALL, ALLOW)
+    val denyWrite = new AccessControlEntry(user1.toString, 
host1.getHostAddress, WRITE, DENY)
+    val u1h1Context = newRequestContext(user1, host1)
+
+    assertFalse("User1 from host1 should not have READ access to any topic 
when no ACL exists",
+      authorizeByResourceType(authorizer, u1h1Context, READ, 
ResourceType.TOPIC))
+
+    addAcls(authorizer, Set(denyWrite, allowAll), resource1)
+    assertTrue("User1 from host1 now should have READ access to at least one 
topic",
+      authorizeByResourceType(authorizer, u1h1Context, READ, 
ResourceType.TOPIC))
+
+    addAcls(authorizer, Set(denyAll), resource1)
+    assertFalse("User1 from host1 now should not have READ access to any 
topic",
+      authorizeByResourceType(authorizer, u1h1Context, READ, 
ResourceType.TOPIC))
+  }
+
+  @Test
+  def testAuthorizeByResourceTypeWithAllHostAce(): Unit = {
+    val user1 = new KafkaPrincipal(KafkaPrincipal.USER_TYPE, "user1")
+    val host1 = InetAddress.getByName("192.168.1.1")
+    val host2 = InetAddress.getByName("192.168.1.2")
+    val allHost = AclEntry.WildcardHost
+    val resource1 = new ResourcePattern(TOPIC, "sb1" + UUID.randomUUID(), 
LITERAL)
+    val resource2 = new ResourcePattern(TOPIC, "sb2" + UUID.randomUUID(), 
LITERAL)
+    val allowHost1 = new AccessControlEntry(user1.toString, 
host1.getHostAddress, READ, ALLOW)
+    val denyHost1 = new AccessControlEntry(user1.toString, 
host1.getHostAddress, READ, DENY)
+    val denyAllHost = new AccessControlEntry(user1.toString, allHost, READ, 
DENY)
+    val allowAllHost = new AccessControlEntry(user1.toString, allHost, READ, 
ALLOW)
+    val u1h1Context = newRequestContext(user1, host1)
+    val u1h2Context = newRequestContext(user1, host2)
+
+    assertFalse("User1 from host1 should not have READ access to any topic 
when no ACL exists",
+      authorizeByResourceType(authorizer, u1h1Context, READ, 
ResourceType.TOPIC))
+
+    addAcls(authorizer, Set(allowHost1), resource1)
+    assertTrue("User1 from host1 should now have READ access to at least one 
topic",
+      authorizeByResourceType(authorizer, u1h1Context, READ, 
ResourceType.TOPIC))
+
+    addAcls(authorizer, Set(denyAllHost), resource1)
+    assertFalse("User1 from host1 now shouldn't have READ access to any topic",
+      authorizeByResourceType(authorizer, u1h1Context, READ, 
ResourceType.TOPIC))
+
+    addAcls(authorizer, Set(denyHost1), resource2)
+    assertFalse("User1 from host1 still should not have READ access to any 
topic",
+      authorizeByResourceType(authorizer, u1h1Context, READ, 
ResourceType.TOPIC))
+    assertFalse("User1 from host2 should not have READ access to any topic",
+      authorizeByResourceType(authorizer, u1h2Context, READ, 
ResourceType.TOPIC))
+
+    addAcls(authorizer, Set(allowAllHost), resource2)
+    assertTrue("User1 from host2 should now have READ access to at least one 
topic",
+      authorizeByResourceType(authorizer, u1h2Context, READ, 
ResourceType.TOPIC))
+
+    addAcls(authorizer, Set(denyAllHost), resource2)
+    assertFalse("User1 from host2 now shouldn't have READ access to any topic",
+      authorizeByResourceType(authorizer, u1h2Context, READ, 
ResourceType.TOPIC))
+  }
+
+  @Test
+  def testAuthorizeByResourceTypeWithAllPrincipalAce(): Unit = {
+    val user1 = new KafkaPrincipal(KafkaPrincipal.USER_TYPE, "user1")
+    val user2 = new KafkaPrincipal(KafkaPrincipal.USER_TYPE, "user2")
+    val allUser = AclEntry.WildcardPrincipalString
+    val host1 = InetAddress.getByName("192.168.1.1")
+    val resource1 = new ResourcePattern(TOPIC, "sb1" + UUID.randomUUID(), 
LITERAL)
+    val resource2 = new ResourcePattern(TOPIC, "sb2" + UUID.randomUUID(), 
LITERAL)
+    val allowUser1 = new AccessControlEntry(user1.toString, 
host1.getHostAddress, READ, ALLOW)
+    val denyUser1 = new AccessControlEntry(user1.toString, 
host1.getHostAddress, READ, DENY)
+    val denyAllUser = new AccessControlEntry(allUser, host1.getHostAddress, 
READ, DENY)
+    val allowAllUser = new AccessControlEntry(allUser, host1.getHostAddress, 
READ, ALLOW)
+    val u1h1Context = newRequestContext(user1, host1)
+    val u2h1Context = newRequestContext(user2, host1)
+
+    assertFalse("User1 from host1 should not have READ access to any topic 
when no ACL exists",
+      authorizeByResourceType(authorizer, u1h1Context, READ, 
ResourceType.TOPIC))
+
+    addAcls(authorizer, Set(allowUser1), resource1)
+    assertTrue("User1 from host1 should now have READ access to at least one 
topic",
+      authorizeByResourceType(authorizer, u1h1Context, READ, 
ResourceType.TOPIC))
+
+    addAcls(authorizer, Set(denyAllUser), resource1)
+    assertFalse("User1 from host1 now shouldn't have READ access to any topic",
+      authorizeByResourceType(authorizer, u1h1Context, READ, 
ResourceType.TOPIC))
+
+    addAcls(authorizer, Set(denyUser1), resource2)
+    assertFalse("User1 from host1 still should not have READ access to any 
topic",
+      authorizeByResourceType(authorizer, u1h1Context, READ, 
ResourceType.TOPIC))
+    assertFalse("User2 from host1 should not have READ access to any topic",
+      authorizeByResourceType(authorizer, u2h1Context, READ, 
ResourceType.TOPIC))
+
+    addAcls(authorizer, Set(allowAllUser), resource2)
+    assertTrue("User2 from host1 should now have READ access to at least one 
topic",
+      authorizeByResourceType(authorizer, u2h1Context, READ, 
ResourceType.TOPIC))
+
+    addAcls(authorizer, Set(denyAllUser), resource2)
+    assertFalse("User2 from host1 now shouldn't have READ access to any topic",
+      authorizeByResourceType(authorizer, u2h1Context, READ, 
ResourceType.TOPIC))
+  }
+
+  @Test
+  def testAuthorzeByResourceTypeSuperUserHasAccess(): Unit = {
+    val denyAllAce = new AccessControlEntry(WildcardPrincipalString, 
WildcardHost, AclOperation.ALL, DENY)
+    val superUser1 = new KafkaPrincipal(KafkaPrincipal.USER_TYPE, 
superUserName)
+    val host1 = InetAddress.getByName("192.0.4.4")
+    val allTopicsResource = new ResourcePattern(TOPIC, WILDCARD_RESOURCE, 
LITERAL)
+    val clusterResource = new ResourcePattern(CLUSTER, WILDCARD_RESOURCE, 
LITERAL)
+    val groupResource = new ResourcePattern(GROUP, WILDCARD_RESOURCE, LITERAL)
+    val transactionIdResource = new ResourcePattern(TRANSACTIONAL_ID, 
WILDCARD_RESOURCE, LITERAL)
+
+    addAcls(authorizer, Set(denyAllAce), allTopicsResource)
+    addAcls(authorizer, Set(denyAllAce), clusterResource)
+    addAcls(authorizer, Set(denyAllAce), groupResource)
+    addAcls(authorizer, Set(denyAllAce), transactionIdResource)
+
+    val superUserContext = newRequestContext(superUser1, host1)
+
+    assertTrue("superuser always has access, no matter what acls.",
+      authorizeByResourceType(authorizer, superUserContext, READ, 
ResourceType.TOPIC))
+    assertTrue("superuser always has access, no matter what acls.",
+      authorizeByResourceType(authorizer, superUserContext, READ, 
ResourceType.CLUSTER))
+    assertTrue("superuser always has access, no matter what acls.",
+      authorizeByResourceType(authorizer, superUserContext, READ, 
ResourceType.GROUP))
+    assertTrue("superuser always has access, no matter what acls.",
+      authorizeByResourceType(authorizer, superUserContext, READ, 
ResourceType.TRANSACTIONAL_ID))
+  }
+
+  def newRequestContext(principal: KafkaPrincipal, clientAddress: InetAddress, 
apiKey: ApiKeys = ApiKeys.PRODUCE): RequestContext = {
+    val securityProtocol = SecurityProtocol.SASL_PLAINTEXT
+    val header = new RequestHeader(apiKey, 2, "", 1) //ApiKeys apiKey, short 
version, String clientId, int correlation
+    new RequestContext(header, "", clientAddress, principal, 
ListenerName.forSecurityProtocol(securityProtocol),
+      securityProtocol, ClientInformation.EMPTY, false)
+  }
+
+  def authorizeByResourceType(authorizer: Authorizer, requestContext: 
RequestContext, operation: AclOperation, resourceType: ResourceType) : Boolean 
= {
+    authorizer.authorizeByResourceType(requestContext, operation, 
resourceType) == AuthorizationResult.ALLOWED
+  }
+
+  def addAcls(authorizer: Authorizer, aces: Set[AccessControlEntry], 
resourcePattern: ResourcePattern): Unit = {
+    val bindings = aces.map { ace => new AclBinding(resourcePattern, ace) }
+    authorizer.createAcls(requestContext, bindings.toList.asJava).asScala
+      .map(_.toCompletableFuture.get)
+      .foreach { result => result.exception.ifPresent { e => throw e } }
+  }
+
+  def removeAcls(authorizer: Authorizer, aces: Set[AccessControlEntry], 
resourcePattern: ResourcePattern): Boolean = {
+    val bindings = if (aces.isEmpty)
+      Set(new AclBindingFilter(resourcePattern.toFilter, 
AccessControlEntryFilter.ANY) )
+    else
+      aces.map { ace => new AclBinding(resourcePattern, ace).toFilter }
+    authorizer.deleteAcls(requestContext, bindings.toList.asJava).asScala
+      .map(_.toCompletableFuture.get)
+      .forall { result =>
+        result.exception.ifPresent { e => throw e }
+        result.aclBindingDeleteResults.forEach { r =>
+          r.exception.ifPresent { e => throw e }
+        }
+        !result.aclBindingDeleteResults.isEmpty
+      }
+  }
+
+}
diff --git 
a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/acl/AclAuthorizerBenchmark.java
 
b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/acl/AclAuthorizerBenchmark.java
index 060ff3a..65aa2a1 100644
--- 
a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/acl/AclAuthorizerBenchmark.java
+++ 
b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/acl/AclAuthorizerBenchmark.java
@@ -49,9 +49,7 @@ import org.openjdk.jmh.annotations.State;
 import org.openjdk.jmh.annotations.TearDown;
 import org.openjdk.jmh.annotations.Warmup;
 import scala.collection.JavaConverters;
-import scala.collection.immutable.TreeMap;
 
-import java.lang.reflect.Field;
 import java.net.InetAddress;
 import java.util.ArrayList;
 import java.util.Collections;
@@ -59,7 +57,9 @@ import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
+import java.util.Random;
 import java.util.Set;
+import java.util.UUID;
 import java.util.concurrent.TimeUnit;
 
 @State(Scope.Benchmark)
@@ -75,18 +75,27 @@ public class AclAuthorizerBenchmark {
     @Param({"10", "50"})
     private int aclCount;
 
+    @Param({"0", "20", "50", "90", "99", "99.9", "99.99", "100"})
+    private double denyPercentage;
+
     private final int hostPreCount = 1000;
     private final String resourceNamePrefix = "foo-bar35_resource-";
-
     private final AclAuthorizer aclAuthorizer = new AclAuthorizer();
     private final KafkaPrincipal principal = new 
KafkaPrincipal(KafkaPrincipal.USER_TYPE, "test-user");
     private List<Action> actions = new ArrayList<>();
-    private RequestContext context;
+    private RequestContext authorizeContext;
+    private RequestContext authorizeByResourceTypeContext;
+    private String authorizeByResourceTypeHostName = "127.0.0.2";
+
+    private HashMap<ResourcePattern, AclAuthorizer.VersionedAcls> aclToUpdate 
= new HashMap<>();
+
+    Random rand = new Random(System.currentTimeMillis());
+    double eps = 1e-9;
 
     @Setup(Level.Trial)
     public void setup() throws Exception {
-        setFieldValue(aclAuthorizer, 
AclAuthorizer.class.getDeclaredField("aclCache").getName(),
-            prepareAclCache());
+        prepareAclCache();
+        prepareAclToUpdate();
         // By adding `-95` to the resource name prefix, we cause the 
`TreeMap.from/to` call to return
         // most map entries. In such cases, we rely on the filtering based on 
`String.startsWith`
         // to return the matching ACLs. Using a more efficient data structure 
(e.g. a prefix
@@ -94,18 +103,15 @@ public class AclAuthorizerBenchmark {
         actions = Collections.singletonList(new Action(AclOperation.WRITE,
             new ResourcePattern(ResourceType.TOPIC, resourceNamePrefix + 95, 
PatternType.LITERAL),
             1, true, true));
-        context = new RequestContext(new RequestHeader(ApiKeys.PRODUCE, 
Integer.valueOf(1).shortValue(),
-            "someclient", 1), "1", InetAddress.getLocalHost(), 
KafkaPrincipal.ANONYMOUS,
+        authorizeContext = new RequestContext(new 
RequestHeader(ApiKeys.PRODUCE, Integer.valueOf(1).shortValue(),
+            "someclient", 1), "1", InetAddress.getByName("127.0.0.1"), 
principal,
+            ListenerName.normalised("listener"), SecurityProtocol.PLAINTEXT, 
ClientInformation.EMPTY, false);
+        authorizeByResourceTypeContext = new RequestContext(new 
RequestHeader(ApiKeys.PRODUCE, Integer.valueOf(1).shortValue(),
+            "someclient", 1), "1", 
InetAddress.getByName(authorizeByResourceTypeHostName), principal,
             ListenerName.normalised("listener"), SecurityProtocol.PLAINTEXT, 
ClientInformation.EMPTY, false);
     }
 
-    private void setFieldValue(Object obj, String fieldName, Object value) 
throws Exception {
-        Field field = obj.getClass().getDeclaredField(fieldName);
-        field.setAccessible(true);
-        field.set(obj, value);
-    }
-
-    private TreeMap<ResourcePattern, VersionedAcls> prepareAclCache() {
+    private void prepareAclCache() {
         Map<ResourcePattern, Set<AclEntry>> aclEntries = new HashMap<>();
         for (int resourceId = 0; resourceId < resourceCount; resourceId++) {
             ResourcePattern resource = new ResourcePattern(
@@ -116,9 +122,20 @@ public class AclAuthorizerBenchmark {
             Set<AclEntry> entries = aclEntries.computeIfAbsent(resource, k -> 
new HashSet<>());
 
             for (int aclId = 0; aclId < aclCount; aclId++) {
-                AccessControlEntry ace = new 
AccessControlEntry(principal.toString() + aclId,
-                    "*", AclOperation.READ, AclPermissionType.ALLOW);
-                entries.add(new AclEntry(ace));
+                // The principal in the request context we are using
+                // is principal.toString without any suffix
+                String principalName = principal.toString() + (aclId == 0 ? "" 
: aclId);
+                AccessControlEntry allowAce = new AccessControlEntry(
+                    principalName, "*", AclOperation.READ, 
AclPermissionType.ALLOW);
+
+                entries.add(new AclEntry(allowAce));
+
+                if (shouldDeny()) {
+                    // dominantly deny the resource
+                    AccessControlEntry denyAce = new AccessControlEntry(
+                        principalName, "*", AclOperation.READ, 
AclPermissionType.DENY);
+                    entries.add(new AclEntry(denyAce));
+                }
             }
         }
 
@@ -126,9 +143,16 @@ public class AclAuthorizerBenchmark {
             PatternType.PREFIXED);
         Set<AclEntry> entriesPrefix = 
aclEntries.computeIfAbsent(resourcePrefix, k -> new HashSet<>());
         for (int hostId = 0; hostId < hostPreCount; hostId++) {
-            AccessControlEntry ace = new 
AccessControlEntry(principal.toString(), "127.0.0." + hostId,
+            AccessControlEntry allowAce = new 
AccessControlEntry(principal.toString(), "127.0.0." + hostId,
                 AclOperation.READ, AclPermissionType.ALLOW);
-            entriesPrefix.add(new AclEntry(ace));
+            entriesPrefix.add(new AclEntry(allowAce));
+
+            if (shouldDeny()) {
+                // dominantly deny the resource
+                AccessControlEntry denyAce = new 
AccessControlEntry(principal.toString(), "127.0.0." + hostId,
+                    AclOperation.READ, AclPermissionType.DENY);
+                entriesPrefix.add(new AclEntry(denyAce));
+            }
         }
 
         ResourcePattern resourceWildcard = new 
ResourcePattern(ResourceType.TOPIC, ResourcePattern.WILDCARD_RESOURCE,
@@ -136,18 +160,50 @@ public class AclAuthorizerBenchmark {
         Set<AclEntry> entriesWildcard = 
aclEntries.computeIfAbsent(resourceWildcard, k -> new HashSet<>());
         // get dynamic entries number for wildcard acl
         for (int hostId = 0; hostId < resourceCount / 10; hostId++) {
-            AccessControlEntry ace = new 
AccessControlEntry(principal.toString(), "127.0.0." + hostId,
+            String hostName = "127.0.0" + hostId;
+            // AuthorizeByResourceType is optimizing the wildcard deny case.
+            // If we didn't skip the host, we would end up having a biased 
short runtime.
+            if (hostName.equals(authorizeByResourceTypeHostName)) {
+                continue;
+            }
+
+            AccessControlEntry allowAce = new 
AccessControlEntry(principal.toString(), hostName,
                 AclOperation.READ, AclPermissionType.ALLOW);
-            entriesWildcard.add(new AclEntry(ace));
+            entriesWildcard.add(new AclEntry(allowAce));
+            if (shouldDeny()) {
+                AccessControlEntry denyAce = new 
AccessControlEntry(principal.toString(), hostName,
+                    AclOperation.READ, AclPermissionType.DENY);
+                entriesWildcard.add(new AclEntry(denyAce));
+            }
+        }
+
+        for (Map.Entry<ResourcePattern, Set<AclEntry>> entryMap : 
aclEntries.entrySet()) {
+            aclAuthorizer.updateCache(entryMap.getKey(),
+                new 
VersionedAcls(JavaConverters.asScalaSetConverter(entryMap.getValue()).asScala().toSet(),
 1));
         }
+    }
 
-        TreeMap<ResourcePattern, VersionedAcls> aclCache = new TreeMap<>(new 
AclAuthorizer.ResourceOrdering());
-        for (Map.Entry<ResourcePattern, Set<AclEntry>> entry : 
aclEntries.entrySet()) {
-            aclCache = aclCache.updated(entry.getKey(),
-                new 
VersionedAcls(JavaConverters.asScalaSetConverter(entry.getValue()).asScala().toSet(),
 1));
+    private void prepareAclToUpdate() {
+        scala.collection.mutable.Set<AclEntry> entries = new 
scala.collection.mutable.HashSet<>();
+        for (int i = 0; i < resourceCount; i++) {
+            scala.collection.immutable.Set<AclEntry> immutable = new 
scala.collection.immutable.HashSet<>();
+            for (int j = 0; j < aclCount; j++) {
+                entries.add(new AclEntry(new AccessControlEntry(
+                    principal.toString(), "127.0.0" + j, AclOperation.WRITE, 
AclPermissionType.ALLOW)));
+                immutable = entries.toSet();
+            }
+            aclToUpdate.put(
+                new ResourcePattern(ResourceType.TOPIC, 
randomResourceName(resourceNamePrefix), PatternType.LITERAL),
+                new AclAuthorizer.VersionedAcls(immutable, i));
         }
+    }
 
-        return aclCache;
+    private String randomResourceName(String prefix) {
+        return prefix + UUID.randomUUID().toString().substring(0, 5);
+    }
+
+    private Boolean shouldDeny() {
+        return rand.nextDouble() * 100.0 - eps < denyPercentage;
     }
 
     @TearDown(Level.Trial)
@@ -162,6 +218,19 @@ public class AclAuthorizerBenchmark {
 
     @Benchmark
     public void testAuthorizer() {
-        aclAuthorizer.authorize(context, actions);
+        aclAuthorizer.authorize(authorizeContext, actions);
+    }
+
+    @Benchmark
+    public void testAuthorizeByResourceType() {
+        aclAuthorizer.authorizeByResourceType(authorizeByResourceTypeContext, 
AclOperation.READ, ResourceType.TOPIC);
+    }
+
+    @Benchmark
+    public void testUpdateCache() {
+        AclAuthorizer aclAuthorizer = new AclAuthorizer();
+        for (Map.Entry<ResourcePattern, VersionedAcls> e : 
aclToUpdate.entrySet()) {
+            aclAuthorizer.updateCache(e.getKey(), e.getValue());
+        }
     }
 }

Reply via email to