[GitHub] [kafka] rajinisivaram commented on a change in pull request #9485: KAKFA-10619: Idempotent producer will get authorized once it has a WRITE access to at least one topic
rajinisivaram commented on a change in pull request #9485: URL: https://github.com/apache/kafka/pull/9485#discussion_r545261128 ## File path: core/src/main/scala/kafka/security/authorizer/AclAuthorizer.scala ## @@ -304,6 +309,137 @@ class AclAuthorizer extends Authorizer with Logging { if (zkClient != null) zkClient.close() } + override def authorizeByResourceType(requestContext: AuthorizableRequestContext, + op: AclOperation, + resourceType: ResourceType): AuthorizationResult = { +SecurityUtils.authorizeByResourceTypeCheckArgs(op, resourceType) + +val principal = new KafkaPrincipal( + requestContext.principal().getPrincipalType, + requestContext.principal().getName) + +if (isSuperUser(principal)) + return AuthorizationResult.ALLOWED + +val principalStr = principal.toString + +val host = requestContext.clientAddress().getHostAddress +val action = new Action(op, new ResourcePattern(resourceType, "NONE", PatternType.UNKNOWN), 0, true, true) + +val denyLiterals = matchingResources( + principalStr, host, op, AclPermissionType.DENY, resourceType, PatternType.LITERAL) + +if (denyAll(denyLiterals)) { + logAuditMessage(requestContext, action, authorized = false) + return AuthorizationResult.DENIED +} + +if (shouldAllowEveryoneIfNoAclIsFound) { + logAuditMessage(requestContext, action, authorized = true) + return AuthorizationResult.ALLOWED +} + +val denyPrefixes = matchingResources( + principalStr, host, op, AclPermissionType.DENY, resourceType, PatternType.PREFIXED) + +if (denyLiterals.isEmpty && denyPrefixes.isEmpty) { + if (hasMatchingResources(principalStr, host, op, AclPermissionType.ALLOW, resourceType, PatternType.PREFIXED) + || hasMatchingResources(principalStr, host, op, AclPermissionType.ALLOW, resourceType, PatternType.LITERAL)) { +logAuditMessage(requestContext, action, authorized = true) +return AuthorizationResult.ALLOWED + } else { +logAuditMessage(requestContext, action, authorized = false) +return AuthorizationResult.DENIED + } +} + +val allowLiterals = matchingResources( + principalStr, host, op, AclPermissionType.ALLOW, resourceType, PatternType.LITERAL) +val allowPrefixes = matchingResources( + principalStr, host, op, AclPermissionType.ALLOW, resourceType, PatternType.PREFIXED) + +if (allowAny(allowLiterals, allowPrefixes, denyLiterals, denyPrefixes)) { + logAuditMessage(requestContext, action, authorized = true) + return AuthorizationResult.ALLOWED +} + +logAuditMessage(requestContext, action, authorized = false) +AuthorizationResult.DENIED + } + + def matchingResources(principal: String, host: String, op: AclOperation, permission: AclPermissionType, +resourceType: ResourceType, patternType: PatternType): List[immutable.HashSet[String]] = { +var matched = List[immutable.HashSet[String]]() +for (p <- Set(principal, AclEntry.WildcardPrincipalString)) { + for (h <- Set(host, AclEntry.WildcardHost)) { +for (o <- Set(op, AclOperation.ALL)) { + val resourceIndex = new ResourceTypeKey( Review comment: `resourceIndex` => `resourceTypeKey`, Also we can omit new for ResourceTypeKey since it is a case class. ## File path: core/src/main/scala/kafka/security/authorizer/AclAuthorizer.scala ## @@ -304,6 +309,137 @@ class AclAuthorizer extends Authorizer with Logging { if (zkClient != null) zkClient.close() } + override def authorizeByResourceType(requestContext: AuthorizableRequestContext, + op: AclOperation, + resourceType: ResourceType): AuthorizationResult = { +SecurityUtils.authorizeByResourceTypeCheckArgs(op, resourceType) + +val principal = new KafkaPrincipal( + requestContext.principal().getPrincipalType, + requestContext.principal().getName) + +if (isSuperUser(principal)) + return AuthorizationResult.ALLOWED + +val principalStr = principal.toString + +val host = requestContext.clientAddress().getHostAddress +val action = new Action(op, new ResourcePattern(resourceType, "NONE", PatternType.UNKNOWN), 0, true, true) + +val denyLiterals = matchingResources( + principalStr, host, op, AclPermissionType.DENY, resourceType, PatternType.LITERAL) + +if (denyAll(denyLiterals)) { + logAuditMessage(requestContext, action, authorized = false) + return AuthorizationResult.DENIED +} + +if (shouldAllowEveryoneIfNoAclIsFound) { + logAuditMessage(requestContext, action, authorized = true) + return AuthorizationResult.ALLOWED +} + +val denyPrefixes = matchingResources( + principalStr, host, op, AclPermissionType.DENY, resourceType,
[GitHub] [kafka] rajinisivaram commented on a change in pull request #9485: KAKFA-10619: Idempotent producer will get authorized once it has a WRITE access to at least one topic
rajinisivaram commented on a change in pull request #9485: URL: https://github.com/apache/kafka/pull/9485#discussion_r544223630 ## File path: clients/src/main/java/org/apache/kafka/server/authorizer/Authorizer.java ## @@ -139,4 +152,134 @@ * @return Iterator for ACL bindings, which may be populated lazily. */ Iterable acls(AclBindingFilter filter); + +/** + * Check if the caller is authorized to perform the given ACL operation on at least one + * resource of the given type. + * + * It is important to override this interface default in implementations because + * 1. The interface default iterates all AclBindings multiple times, without any indexing, + *which is a CPU intense work. + * 2. The interface default rebuild several sets of strings, which is a memory intense work. + * 3. The interface default cannot perform the audit logging properly + * + * @param requestContext Request context including request resourceType, security protocol, and listener name + * @param op The ACL operation to check + * @param resourceType The resource type to check + * @return Return {@link AuthorizationResult#ALLOWED} if the caller is authorized to perform the + * given ACL operation on at least one resource of the given type. + * Return {@link AuthorizationResult#DENIED} otherwise. + */ +default AuthorizationResult authorizeByResourceType(AuthorizableRequestContext requestContext, AclOperation op, ResourceType resourceType) { +SecurityUtils.authorizeByResourceTypeCheckArgs(op, resourceType); + +if (authorize(requestContext, Collections.singletonList(new Action( +AclOperation.READ, +new ResourcePattern(resourceType, "hardcode", PatternType.LITERAL), +0, false, false))) Review comment: Use `logIfAllowed=true` since we are granting access in that case. ## File path: core/src/test/scala/unit/kafka/security/authorizer/AuthorizerWrapperTest.scala ## @@ -0,0 +1,211 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package kafka.security.authorizer + +import java.net.InetAddress +import java.util.UUID + +import kafka.security.auth.SimpleAclAuthorizer +import kafka.server.KafkaConfig +import kafka.utils.TestUtils +import kafka.zk.ZooKeeperTestHarness +import kafka.zookeeper.ZooKeeperClient +import org.apache.kafka.common.acl.AclOperation._ +import org.apache.kafka.common.acl._ +import org.apache.kafka.common.network.{ClientInformation, ListenerName} +import org.apache.kafka.common.protocol.ApiKeys +import org.apache.kafka.common.requests.{RequestContext, RequestHeader} +import org.apache.kafka.common.resource.PatternType.LITERAL +import org.apache.kafka.common.resource.ResourceType._ +import org.apache.kafka.common.resource.{ResourcePattern, ResourceType} +import org.apache.kafka.common.security.auth.{KafkaPrincipal, SecurityProtocol} +import org.apache.kafka.common.utils.Time +import org.apache.kafka.server.authorizer._ +import org.junit.Assert._ +import org.junit.{After, Before, Test} + +import scala.annotation.nowarn +import scala.collection.mutable.ArrayBuffer +import scala.jdk.CollectionConverters._ + +class AuthorizerWrapperTest extends ZooKeeperTestHarness { + @nowarn("cat=deprecation") + private val wrappedSimpleAuthorizer = new AuthorizerWrapper(new SimpleAclAuthorizer) + @nowarn("cat=deprecation") + private val wrappedSimpleAuthorizerAllowEveryone = new AuthorizerWrapper(new SimpleAclAuthorizer) + private var resource: ResourcePattern = _ + private val superUsers = "User:superuser1; User:superuser2" + private val username = "alice" + private val principal = new KafkaPrincipal(KafkaPrincipal.USER_TYPE, username) + private val requestContext = newRequestContext(principal, InetAddress.getByName("192.168.0.1")) + private var config: KafkaConfig = _ + private var zooKeeperClient: ZooKeeperClient = _ + + private val aclAdded: ArrayBuffer[(Authorizer, Set[AccessControlEntry], ResourcePattern)] = ArrayBuffer() + private val authorizerTestFactory = new AuthorizerTestFactory( +
[GitHub] [kafka] rajinisivaram commented on a change in pull request #9485: KAKFA-10619: Idempotent producer will get authorized once it has a WRITE access to at least one topic
rajinisivaram commented on a change in pull request #9485: URL: https://github.com/apache/kafka/pull/9485#discussion_r542830098 ## File path: clients/src/main/java/org/apache/kafka/server/authorizer/Authorizer.java ## @@ -139,4 +151,126 @@ * @return Iterator for ACL bindings, which may be populated lazily. */ Iterable acls(AclBindingFilter filter); + +/** + * Check if the caller is authorized to perform the given ACL operation on at least one + * resource of the given type. + * + * 1. Filter out all the resource pattern corresponding to the requestContext, AclOperation, + *and ResourceType + * 2. If wildcard deny exists, return deny directly + * 3. For any literal allowed resource, if there's no dominant literal denied resource, and + *no dominant prefixed denied resource, return allow + * 4. For any prefixed allowed resource, if there's no dominant denied resource, return allow + * 5. For any other cases, return deny Review comment: Since this is the javadoc of a public API, we should move the details on how the default implementation works outside of the javadoc. We can move this list of comments inside the method. ## File path: clients/src/main/java/org/apache/kafka/server/authorizer/Authorizer.java ## @@ -139,4 +151,126 @@ * @return Iterator for ACL bindings, which may be populated lazily. */ Iterable acls(AclBindingFilter filter); + +/** + * Check if the caller is authorized to perform the given ACL operation on at least one + * resource of the given type. + * + * 1. Filter out all the resource pattern corresponding to the requestContext, AclOperation, + *and ResourceType + * 2. If wildcard deny exists, return deny directly + * 3. For any literal allowed resource, if there's no dominant literal denied resource, and + *no dominant prefixed denied resource, return allow + * 4. For any prefixed allowed resource, if there's no dominant denied resource, return allow + * 5. For any other cases, return deny + * + * It is important to override this interface default in implementations because + * 1. The interface default iterates all AclBindings multiple times, without any indexing, + *which is a CPU intense work. + * 2. The interface default rebuild several sets of strings, which is a memory intense work. + * + * @param requestContext Request context including request resourceType, security protocol, and listener name + * @param op The ACL operation to check + * @param resourceType The resource type to check + * @return Return {@link AuthorizationResult#ALLOWED} if the caller is authorized to perform the + * given ACL operation on at least one resource of the given type. + * Return {@link AuthorizationResult#DENIED} otherwise. + */ +default AuthorizationResult authorizeByResourceType(AuthorizableRequestContext requestContext, AclOperation op, ResourceType resourceType) { +SecurityUtils.authorizeByResourceTypeCheckArgs(op, resourceType); Review comment: We don't currently have anything in the default implementation to support super.users right? Unlike `allow.everyone.if.no.acl.found` which is not particularly suitable for production use, `super.users` is a commonly used config that is likely to be in use in a lot of deployments. The simplest fix may be to `authorize()` with a hard-coded name and return ALLOWED if `authorize()` returns ALLOWED before any of the logic below is executed. ## File path: core/src/test/scala/unit/kafka/security/authorizer/AuthorizerInterfaceDefaultTest.scala ## @@ -0,0 +1,164 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package kafka.security.authorizer + +import java.net.InetAddress + +import kafka.server.KafkaConfig +import kafka.utils.TestUtils +import kafka.zk.ZooKeeperTestHarness +import kafka.zookeeper.ZooKeeperClient +import org.apache.kafka.common.acl._ +import org.apache.kafka.common.network.{ClientInformation, ListenerName} +import
[GitHub] [kafka] rajinisivaram commented on a change in pull request #9485: KAKFA-10619: Idempotent producer will get authorized once it has a WRITE access to at least one topic
rajinisivaram commented on a change in pull request #9485: URL: https://github.com/apache/kafka/pull/9485#discussion_r539594382 ## File path: clients/src/main/java/org/apache/kafka/common/acl/ResourceIndex.java ## @@ -0,0 +1,61 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.common.acl; Review comment: This package is part of the public API, but the class looks like it should be internal? ## File path: core/src/main/scala/kafka/security/authorizer/AclAuthorizer.scala ## @@ -304,6 +308,105 @@ class AclAuthorizer extends Authorizer with Logging { if (zkClient != null) zkClient.close() } + // TODO: 1. Discuss how to log audit message + // TODO: 2. Discuss if we need a trie to optimize(mainly for the O(n^2) loop but I think + // in most of the cases it would be O(1) because denyDominatePrefixAllow should be rare + override def authorizeByResourceType(requestContext: AuthorizableRequestContext, + op: AclOperation, + resourceType: ResourceType): AuthorizationResult = { +SecurityUtils.authorizeByResourceTypeCheckArgs(op, resourceType) + +val principal = new KafkaPrincipal( + requestContext.principal().getPrincipalType, + requestContext.principal().getName).toString +val host = requestContext.clientAddress().getHostAddress +val action = new Action(op, new ResourcePattern(resourceType, "NONE", PatternType.UNKNOWN), 0, true, true) + +val denyLiterals = matchingResources( + principal, host, op, AclPermissionType.DENY, resourceType, PatternType.LITERAL) + +if (denyAll(denyLiterals)) { + logAuditMessage(requestContext, action, false) + return AuthorizationResult.DENIED +} + +if (shouldAllowEveryoneIfNoAclIsFound) { + logAuditMessage(requestContext, action, true) + return AuthorizationResult.ALLOWED +} + +val allowLiterals = matchingResources( + principal, host, op, AclPermissionType.ALLOW, resourceType, PatternType.LITERAL) +val allowPrefixes = matchingResources( + principal, host, op, AclPermissionType.ALLOW, resourceType, PatternType.PREFIXED) +val denyPrefixes = matchingResources( + principal, host, op, AclPermissionType.DENY, resourceType, PatternType.PREFIXED) + +if (allowAny(allowLiterals, allowPrefixes, denyLiterals, denyPrefixes)) { + logAuditMessage(requestContext, action, true) + return AuthorizationResult.ALLOWED +} + +logAuditMessage(requestContext, action, false) +AuthorizationResult.DENIED + } + + def matchingResources(principal: String, host: String, op: AclOperation, permission: AclPermissionType, +resourceType: ResourceType, patternType: PatternType): List[mutable.HashSet[String]] = { +var matched = List[mutable.HashSet[String]]() +for (p <- Set(principal, AclEntry.WildcardPrincipal.toString)) { Review comment: `AclEntry.WildcardPrincipalString` ## File path: core/src/main/scala/kafka/security/authorizer/AuthorizerWrapper.scala ## @@ -175,4 +181,32 @@ class AuthorizerWrapper(private[kafka] val baseAuthorizer: kafka.security.auth.A override def close(): Unit = { baseAuthorizer.close() } + + override def authorizeByResourceType(requestContext: AuthorizableRequestContext, + op: AclOperation, + resourceType: ResourceType): AuthorizationResult = { +SecurityUtils.authorizeByResourceTypeCheckArgs(op, resourceType) + +if (denyAllResource(requestContext, op, resourceType)) { + AuthorizationResult.DENIED +} else if (shouldAllowEveryoneIfNoAclIsFound) { + AuthorizationResult.ALLOWED +} else { + super.authorizeByResourceType(requestContext, op, resourceType) +} + } + + private def denyAllResource(requestContext: AuthorizableRequestContext, + op: AclOperation, + resourceType: ResourceType): Boolean = { +val resourceTypeFilter = new ResourcePatternFilter( + resourceType, null, PatternType.ANY) +val principal = new
[GitHub] [kafka] rajinisivaram commented on a change in pull request #9485: KAKFA-10619: Idempotent producer will get authorized once it has a WRITE access to at least one topic
rajinisivaram commented on a change in pull request #9485: URL: https://github.com/apache/kafka/pull/9485#discussion_r533510477 ## File path: core/src/main/scala/kafka/security/authorizer/AclAuthorizer.scala ## @@ -304,6 +308,122 @@ class AclAuthorizer extends Authorizer with Logging { if (zkClient != null) zkClient.close() } + // TODO: 1. Discuss how to log audit message + // TODO: 2. Discuss if we need a trie to optimize(mainly for the O(n^2) loop but I think + // in most of the cases it would be O(1) because denyDominatePrefixAllow should be rare + override def authorizeByResourceType(requestContext: AuthorizableRequestContext, + op: AclOperation, + resourceType: ResourceType): AuthorizationResult = { +if (resourceType eq ResourceType.ANY) + throw new IllegalArgumentException("Must specify a non-filter resource type for authorizeByResourceType") + +if (resourceType eq ResourceType.UNKNOWN) + throw new IllegalArgumentException("Unknown resource type") + +if (op eq AclOperation.ANY) + throw new IllegalArgumentException("Must specify a non-filter operation type for authorizeByResourceType") + +if (op eq AclOperation.UNKNOWN) + throw new IllegalArgumentException("Unknown operation type") Review comment: We should probably move this common code to SecurityUtils and use it both here and in the default implementation. ## File path: core/src/main/scala/kafka/security/authorizer/AclAuthorizer.scala ## @@ -304,6 +308,122 @@ class AclAuthorizer extends Authorizer with Logging { if (zkClient != null) zkClient.close() } + // TODO: 1. Discuss how to log audit message + // TODO: 2. Discuss if we need a trie to optimize(mainly for the O(n^2) loop but I think + // in most of the cases it would be O(1) because denyDominatePrefixAllow should be rare + override def authorizeByResourceType(requestContext: AuthorizableRequestContext, + op: AclOperation, + resourceType: ResourceType): AuthorizationResult = { +if (resourceType eq ResourceType.ANY) + throw new IllegalArgumentException("Must specify a non-filter resource type for authorizeByResourceType") + +if (resourceType eq ResourceType.UNKNOWN) + throw new IllegalArgumentException("Unknown resource type") + +if (op eq AclOperation.ANY) + throw new IllegalArgumentException("Must specify a non-filter operation type for authorizeByResourceType") + +if (op eq AclOperation.UNKNOWN) + throw new IllegalArgumentException("Unknown operation type") + +val principal = new KafkaPrincipal( + requestContext.principal().getPrincipalType, + requestContext.principal().getName).toString + +val denyPatterns = matchingPatterns( + principal, + requestContext.clientAddress().getHostAddress, + op, + resourceType, + AclPermissionType.DENY +) + +if (denyAll(denyPatterns)) { + logAuditMessage(requestContext, new Action(op, null,0, true, true), false, false) + return AuthorizationResult.DENIED +} + +if (shouldAllowEveryoneIfNoAclIsFound) { + logAuditMessage(requestContext, new Action(op, null, 0, true, true), true, false) + return AuthorizationResult.ALLOWED +} + +val allowPatterns = matchingPatterns( + principal, + requestContext.clientAddress().getHostAddress, + op, + resourceType, + AclPermissionType.ALLOW +) + +if (allowAny(allowPatterns, denyPatterns)) { + logAuditMessage(requestContext, new Action(op,null, 0, true, true), true, false) + return AuthorizationResult.ALLOWED +} + +logAuditMessage(requestContext, new Action(op, null, 0, true, true), false, false) +AuthorizationResult.DENIED + } + + def matchingPatterns(principal: String, host: String, op: AclOperation, + resourceType: ResourceType, + permission: AclPermissionType): Set[ResourcePattern] = { +var resources = Set[ResourcePattern]() +for (p <- Set(principal, AclEntry.WildcardPrincipal.toString)) { + for (h <- Set(host, AclEntry.WildcardHost)) { +for (o <- Set(op, AclOperation.ALL)) { + val ace = new AccessControlEntry(p, h, o, permission) + resourceCache.get(ace) match { +case Some(r) => resources ++= r.filter(r => r.resourceType() == resourceType) +case None => + } +} + } +} +resources + } + + private def denyAll(denyResources: Set[ResourcePattern]): Boolean = +denyResources.exists(rp => denyAll(rp)) + + private def denyAll(rp: ResourcePattern): Boolean = +rp.patternType() == PatternType.LITERAL && rp.name() == ResourcePattern.WILDCARD_RESOURCE + + private def allowAny(allowPatterns: Set[ResourcePattern], denyPatterns:
[GitHub] [kafka] rajinisivaram commented on a change in pull request #9485: KAKFA-10619: Idempotent producer will get authorized once it has a WRITE access to at least one topic
rajinisivaram commented on a change in pull request #9485: URL: https://github.com/apache/kafka/pull/9485#discussion_r526479473 ## File path: clients/src/main/java/org/apache/kafka/server/authorizer/Authorizer.java ## @@ -139,4 +150,129 @@ * @return Iterator for ACL bindings, which may be populated lazily. */ Iterable acls(AclBindingFilter filter); + +/** + * Check if the caller is authorized to perform the given ACL operation on at least one + * resource of the given type. Review comment: We should document what this default implementation does and why a custom implementation may want to override this default. ## File path: clients/src/main/java/org/apache/kafka/server/authorizer/Authorizer.java ## @@ -139,4 +150,129 @@ * @return Iterator for ACL bindings, which may be populated lazily. */ Iterable acls(AclBindingFilter filter); + +/** + * Check if the caller is authorized to perform the given ACL operation on at least one + * resource of the given type. Review comment: We should document what this default implementation does and why a custom implementation may want to override this default. ## File path: core/src/main/scala/kafka/security/authorizer/AclAuthorizer.scala ## @@ -307,6 +312,111 @@ class AclAuthorizer extends Authorizer with Logging { if (zkClient != null) zkClient.close() } + // TODO: 1. Discuss how to log audit message Review comment: We should have exactly one call to `logAuditMessage` that says whether access was allowed or denied. ## File path: core/src/main/scala/kafka/security/authorizer/AclAuthorizer.scala ## @@ -307,6 +312,111 @@ class AclAuthorizer extends Authorizer with Logging { if (zkClient != null) zkClient.close() } + // TODO: 1. Discuss how to log audit message + // TODO: 2. Discuss if we need a trie to optimize(mainly for the O(n^2) loop but I think + // in most of the cases it would be O(1) because denyDominatePrefixAllow should be rare + override def authorizeByResourceType(requestContext: AuthorizableRequestContext, + op: AclOperation, + resourceType: ResourceType): AuthorizationResult = { +if (resourceType eq ResourceType.ANY) + throw new IllegalArgumentException("Must specify a non-filter resource type for authorizeByResourceType") + +if (resourceType eq ResourceType.UNKNOWN) + throw new IllegalArgumentException("Unknown resource type") + +if (op eq AclOperation.ANY) + throw new IllegalArgumentException("Must specify a non-filter operation type for authorizeByResourceType") + +if (op eq AclOperation.UNKNOWN) + throw new IllegalArgumentException("Unknown operation type") + +val allowPatterns = matchingPatterns( + requestContext.principal().toString, + requestContext.clientAddress().getHostAddress, + op, + resourceType, + AclPermissionType.ALLOW +) + +val denyPatterns = matchingPatterns( + requestContext.principal().toString, + requestContext.clientAddress().getHostAddress, + op, + resourceType, + AclPermissionType.DENY Review comment: We should optimize for the case where there are no DENY acls. There is no point in finding all matching ALLOW entries in that case, we would just need to check for one ALLOW. ## File path: core/src/test/scala/unit/kafka/security/authorizer/AclAuthorizerTest.scala ## @@ -988,6 +1000,369 @@ class AclAuthorizerTest extends ZooKeeperTestHarness { } } + @Test + def testAuthorizeAnyDurability(): Unit = { Review comment: As before, references to Durability in authorizer tests are confusing. ## File path: core/src/test/scala/unit/kafka/security/authorizer/AuthorizerInterfaceDefaultTest.java ## @@ -0,0 +1,4 @@ +package unit.kafka.security.authorizer; + +public class AuthorizerInterfaceDefaultTest { Review comment: Are we going to add tests here? ## File path: core/src/test/scala/unit/kafka/security/authorizer/MockAuthorizer.scala ## @@ -0,0 +1,62 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing