d8tltanc commented on a change in pull request #9485:
URL: https://github.com/apache/kafka/pull/9485#discussion_r534751595



##########
File path: core/src/main/scala/kafka/security/authorizer/AuthorizerWrapper.scala
##########
@@ -175,4 +179,73 @@ class AuthorizerWrapper(private[kafka] val baseAuthorizer: 
kafka.security.auth.A
   override def close(): Unit = {
     baseAuthorizer.close()
   }
+
+  override def authorizeByResourceType(requestContext: 
AuthorizableRequestContext,
+                                       op: AclOperation,
+                                       resourceType: ResourceType): 
AuthorizationResult = {
+    if (resourceType == ResourceType.ANY)
+      throw new IllegalArgumentException("Must specify a non-filter resource 
type for authorizeByResourceType")
+
+    if (resourceType == ResourceType.UNKNOWN)
+      throw new IllegalArgumentException("Unknown resource type")
+
+    if (op == AclOperation.ANY)
+      throw new IllegalArgumentException("Must specify a non-filter operation 
type for authorizeByResourceType")
+
+    if (op == AclOperation.UNKNOWN)
+      throw new IllegalArgumentException("Unknown operation type")
+
+    if (shouldAllowEveryoneIfNoAclIsFound && !denyAllResource(requestContext, 
op, resourceType)) {
+      AuthorizationResult.ALLOWED
+    } else {
+      super.authorizeByResourceType(requestContext, op, resourceType)
+    }
+  }
+
+  private def denyAllResource(requestContext: AuthorizableRequestContext,
+                      op: AclOperation,
+                      resourceType: ResourceType): Boolean = {
+    val resourceTypeFilter = new ResourcePatternFilter(
+      resourceType, null, PatternType.ANY)
+    val accessControlEntry = new AccessControlEntryFilter(
+      null, null, null, AclPermissionType.DENY)
+    val aclFilter = new AclBindingFilter(resourceTypeFilter, 
accessControlEntry)
+
+    for (binding <- acls(aclFilter).asScala) {
+      if (aceMatched(requestContext, op, binding) && 
canDenyAll(binding.pattern()))
+        return true
+    }
+    false
+  }
+
+  @inline
+  private def aceMatched(requestContext: AuthorizableRequestContext,
+                 op: AclOperation,
+                 binding: AclBinding): Boolean = {
+    (hostMatched(requestContext, binding) && principleMatched(requestContext, 
binding)
+      && operationMatched(op, binding))
+  }
+
+  @inline
+  private def hostMatched(requestContext: AuthorizableRequestContext,
+                  binding: AclBinding): Boolean =
+    
(binding.entry().host().equals(requestContext.clientAddress().getHostAddress)
+      || binding.entry().host().equals(AclEntry.WildcardHost))
+
+  @inline
+  private def principleMatched(requestContext: AuthorizableRequestContext,
+                  binding: AclBinding): Boolean =
+    (binding.entry().principal().equals(requestContext.principal().toString)
+      || 
binding.entry().principal().equals(AclEntry.WildcardPrincipal.toString))
+
+  @inline
+  private def operationMatched(op: AclOperation,
+                       binding: AclBinding): Boolean =
+    (binding.entry().operation() == op
+      || binding.entry().operation() == AclOperation.ALL)
+
+  @inline
+  private def canDenyAll(pattern: ResourcePattern): Boolean =
+    pattern.patternType() == PatternType.LITERAL && 
pattern.name().equals(ResourcePattern.WILDCARD_RESOURCE)
+

Review comment:
       Yeah, moved to SecurityUtils. commit 
30899c45ac50b70625baa2e5f12f58cfe9d79404




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to