d8tltanc commented on a change in pull request #9485: URL: https://github.com/apache/kafka/pull/9485#discussion_r539787542
########## File path: core/src/main/scala/kafka/security/authorizer/AclAuthorizer.scala ########## @@ -304,6 +308,105 @@ class AclAuthorizer extends Authorizer with Logging { if (zkClient != null) zkClient.close() } + // TODO: 1. Discuss how to log audit message + // TODO: 2. Discuss if we need a trie to optimize(mainly for the O(n^2) loop but I think + // in most of the cases it would be O(1) because denyDominatePrefixAllow should be rare + override def authorizeByResourceType(requestContext: AuthorizableRequestContext, + op: AclOperation, + resourceType: ResourceType): AuthorizationResult = { + SecurityUtils.authorizeByResourceTypeCheckArgs(op, resourceType) + + val principal = new KafkaPrincipal( + requestContext.principal().getPrincipalType, + requestContext.principal().getName).toString + val host = requestContext.clientAddress().getHostAddress + val action = new Action(op, new ResourcePattern(resourceType, "NONE", PatternType.UNKNOWN), 0, true, true) + + val denyLiterals = matchingResources( + principal, host, op, AclPermissionType.DENY, resourceType, PatternType.LITERAL) + + if (denyAll(denyLiterals)) { + logAuditMessage(requestContext, action, false) + return AuthorizationResult.DENIED + } + + if (shouldAllowEveryoneIfNoAclIsFound) { + logAuditMessage(requestContext, action, true) + return AuthorizationResult.ALLOWED + } + + val allowLiterals = matchingResources( + principal, host, op, AclPermissionType.ALLOW, resourceType, PatternType.LITERAL) + val allowPrefixes = matchingResources( + principal, host, op, AclPermissionType.ALLOW, resourceType, PatternType.PREFIXED) + val denyPrefixes = matchingResources( + principal, host, op, AclPermissionType.DENY, resourceType, PatternType.PREFIXED) + + if (allowAny(allowLiterals, allowPrefixes, denyLiterals, denyPrefixes)) { + logAuditMessage(requestContext, action, true) + return AuthorizationResult.ALLOWED + } + + logAuditMessage(requestContext, action, false) + AuthorizationResult.DENIED + } + + def matchingResources(principal: String, host: String, op: AclOperation, permission: AclPermissionType, + resourceType: ResourceType, patternType: PatternType): List[mutable.HashSet[String]] = { + var matched = List[mutable.HashSet[String]]() + for (p <- Set(principal, AclEntry.WildcardPrincipal.toString)) { + for (h <- Set(host, AclEntry.WildcardHost)) { + for (o <- Set(op, AclOperation.ALL)) { + val ace = new AccessControlEntry(p, h, o, permission) + val resourceIndex = new ResourceIndex(ace, resourceType, patternType) + resourceCache.get(resourceIndex) match { + case Some(resources) => matched = matched :+ resources + case None => + } + } + } + } + matched + } + + def denyAll(denyLiterals: List[mutable.HashSet[String]]): Boolean = + denyLiterals.exists(r => r.contains(ResourcePattern.WILDCARD_RESOURCE)) + + + private def allowAny(allowLiterals: List[mutable.Set[String]], allowPrefixes: List[mutable.Set[String]], + denyLiterals: List[mutable.Set[String]], denyPrefixes: List[mutable.Set[String]]): Boolean = { + (allowPrefixes.exists(prefixes => + prefixes.exists(prefix => allowPrefix(prefix, denyPrefixes))) + || allowLiterals.exists(literals => + literals.exists(literal => allowLiteral(literal, denyLiterals, denyPrefixes)))) + } + + private def allowLiteral(literalName: String, + denyLiterals: List[mutable.Set[String]], denyPrefixes: List[mutable.Set[String]]): Boolean = { + literalName match{ + case ResourcePattern.WILDCARD_RESOURCE => true + case _ => (denyLiterals.forall(denyLiterals => !denyLiterals.contains(literalName)) + && !hasDominantPrefixedDeny(literalName, denyPrefixes)) + } + } + + private def allowPrefix(prefixName: String, + denyPrefixes: List[mutable.Set[String]]): Boolean = { + !hasDominantPrefixedDeny(prefixName, denyPrefixes) + } + + private def hasDominantPrefixedDeny(resourceName: String, denyPrefixes: List[mutable.Set[String]]): Boolean = { Review comment: I was trying to share it but it seems like the different collection type btw java and scala is a headache. We'll then need some java converters or instantiate a java collection in the scala code. Do you think we do this? ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org