d8tltanc commented on a change in pull request #9485: URL: https://github.com/apache/kafka/pull/9485#discussion_r534755163
########## File path: core/src/main/scala/kafka/security/authorizer/AclAuthorizer.scala ########## @@ -304,6 +308,122 @@ class AclAuthorizer extends Authorizer with Logging { if (zkClient != null) zkClient.close() } + // TODO: 1. Discuss how to log audit message + // TODO: 2. Discuss if we need a trie to optimize(mainly for the O(n^2) loop but I think + // in most of the cases it would be O(1) because denyDominatePrefixAllow should be rare + override def authorizeByResourceType(requestContext: AuthorizableRequestContext, + op: AclOperation, + resourceType: ResourceType): AuthorizationResult = { + if (resourceType eq ResourceType.ANY) + throw new IllegalArgumentException("Must specify a non-filter resource type for authorizeByResourceType") + + if (resourceType eq ResourceType.UNKNOWN) + throw new IllegalArgumentException("Unknown resource type") + + if (op eq AclOperation.ANY) + throw new IllegalArgumentException("Must specify a non-filter operation type for authorizeByResourceType") + + if (op eq AclOperation.UNKNOWN) + throw new IllegalArgumentException("Unknown operation type") + + val principal = new KafkaPrincipal( + requestContext.principal().getPrincipalType, + requestContext.principal().getName).toString + + val denyPatterns = matchingPatterns( + principal, + requestContext.clientAddress().getHostAddress, + op, + resourceType, + AclPermissionType.DENY + ) + + if (denyAll(denyPatterns)) { + logAuditMessage(requestContext, new Action(op, null,0, true, true), false, false) + return AuthorizationResult.DENIED + } + + if (shouldAllowEveryoneIfNoAclIsFound) { + logAuditMessage(requestContext, new Action(op, null, 0, true, true), true, false) + return AuthorizationResult.ALLOWED + } + + val allowPatterns = matchingPatterns( + principal, + requestContext.clientAddress().getHostAddress, + op, + resourceType, + AclPermissionType.ALLOW + ) + + if (allowAny(allowPatterns, denyPatterns)) { + logAuditMessage(requestContext, new Action(op,null, 0, true, true), true, false) + return AuthorizationResult.ALLOWED + } + + logAuditMessage(requestContext, new Action(op, null, 0, true, true), false, false) + AuthorizationResult.DENIED + } + + def matchingPatterns(principal: String, host: String, op: AclOperation, + resourceType: ResourceType, + permission: AclPermissionType): Set[ResourcePattern] = { + var resources = Set[ResourcePattern]() + for (p <- Set(principal, AclEntry.WildcardPrincipal.toString)) { + for (h <- Set(host, AclEntry.WildcardHost)) { + for (o <- Set(op, AclOperation.ALL)) { + val ace = new AccessControlEntry(p, h, o, permission) + resourceCache.get(ace) match { + case Some(r) => resources ++= r.filter(r => r.resourceType() == resourceType) + case None => + } + } + } + } + resources + } + + private def denyAll(denyResources: Set[ResourcePattern]): Boolean = + denyResources.exists(rp => denyAll(rp)) + + private def denyAll(rp: ResourcePattern): Boolean = + rp.patternType() == PatternType.LITERAL && rp.name() == ResourcePattern.WILDCARD_RESOURCE + + private def allowAny(allowPatterns: Set[ResourcePattern], denyPatterns: Set[ResourcePattern]): Boolean = + allowPatterns.exists(pattern => allow(pattern, denyPatterns)) + + private def allow(pattern: ResourcePattern, denyPatterns: Set[ResourcePattern]): Boolean = { Review comment: Now the only difference is that the AclAuthorizer is indexing on ACE, so the number of ACE won't impact the query efficiency. ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org