[GitHub] [kafka] d8tltanc commented on a change in pull request #9485: KAKFA-10619: Idempotent producer will get authorized once it has a WRITE access to at least one topic
d8tltanc commented on a change in pull request #9485: URL: https://github.com/apache/kafka/pull/9485#discussion_r545585355 ## File path: core/src/test/scala/unit/kafka/security/authorizer/AuthorizerInterfaceDefaultTest.scala ## @@ -0,0 +1,99 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package kafka.security.authorizer + +import java.util.concurrent.CompletionStage +import java.{lang, util} + +import kafka.server.KafkaConfig +import kafka.utils.TestUtils +import kafka.zk.ZooKeeperTestHarness +import kafka.zookeeper.ZooKeeperClient +import org.apache.kafka.common.Endpoint +import org.apache.kafka.common.acl._ +import org.apache.kafka.common.utils.Time +import org.apache.kafka.server.authorizer._ +import org.junit.{After, Before} + +class AuthorizerInterfaceDefaultTest extends ZooKeeperTestHarness with BaseAuthorizerTest { + + private val interfaceDefaultAuthorizer = new DelegateAuthorizer + + override def authorizer: Authorizer = interfaceDefaultAuthorizer + + @Before + override def setUp(): Unit = { +super.setUp() + +val authorizers = Seq(interfaceDefaultAuthorizer.authorizer) Review comment: Yes. Remove the Seq construction and make a single class member call. commit b6a766b This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] d8tltanc commented on a change in pull request #9485: KAKFA-10619: Idempotent producer will get authorized once it has a WRITE access to at least one topic
d8tltanc commented on a change in pull request #9485: URL: https://github.com/apache/kafka/pull/9485#discussion_r545585037 ## File path: core/src/test/scala/unit/kafka/security/authorizer/AclAuthorizerTest.scala ## @@ -988,6 +980,30 @@ class AclAuthorizerTest extends ZooKeeperTestHarness { } } + @Test + def testAuthorizeByResourceTypeNoAclFoundOverride(): Unit = { +testAuthorizeByResourceTypeNoAclFoundOverride(aclAuthorizer) + } + + private def testAuthorizeByResourceTypeNoAclFoundOverride(authorizer: Authorizer): Unit = { +val props = TestUtils.createBrokerConfig(1, zkConnect) +props.put(AclAuthorizer.AllowEveryoneIfNoAclIsFoundProp, "true") + +val cfg = KafkaConfig.fromProps(props) +val testAuthorizer = new AclAuthorizer Review comment: Yes. Renamed to aclAuthorizer. commit b6a766b This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] d8tltanc commented on a change in pull request #9485: KAKFA-10619: Idempotent producer will get authorized once it has a WRITE access to at least one topic
d8tltanc commented on a change in pull request #9485: URL: https://github.com/apache/kafka/pull/9485#discussion_r545584783 ## File path: core/src/main/scala/kafka/security/authorizer/AclAuthorizer.scala ## @@ -304,6 +309,137 @@ class AclAuthorizer extends Authorizer with Logging { if (zkClient != null) zkClient.close() } + override def authorizeByResourceType(requestContext: AuthorizableRequestContext, + op: AclOperation, + resourceType: ResourceType): AuthorizationResult = { +SecurityUtils.authorizeByResourceTypeCheckArgs(op, resourceType) + +val principal = new KafkaPrincipal( + requestContext.principal().getPrincipalType, + requestContext.principal().getName) + +if (isSuperUser(principal)) + return AuthorizationResult.ALLOWED + +val principalStr = principal.toString + +val host = requestContext.clientAddress().getHostAddress +val action = new Action(op, new ResourcePattern(resourceType, "NONE", PatternType.UNKNOWN), 0, true, true) + +val denyLiterals = matchingResources( + principalStr, host, op, AclPermissionType.DENY, resourceType, PatternType.LITERAL) + +if (denyAll(denyLiterals)) { + logAuditMessage(requestContext, action, authorized = false) + return AuthorizationResult.DENIED +} + +if (shouldAllowEveryoneIfNoAclIsFound) { + logAuditMessage(requestContext, action, authorized = true) + return AuthorizationResult.ALLOWED +} + +val denyPrefixes = matchingResources( + principalStr, host, op, AclPermissionType.DENY, resourceType, PatternType.PREFIXED) + +if (denyLiterals.isEmpty && denyPrefixes.isEmpty) { + if (hasMatchingResources(principalStr, host, op, AclPermissionType.ALLOW, resourceType, PatternType.PREFIXED) + || hasMatchingResources(principalStr, host, op, AclPermissionType.ALLOW, resourceType, PatternType.LITERAL)) { +logAuditMessage(requestContext, action, authorized = true) +return AuthorizationResult.ALLOWED + } else { +logAuditMessage(requestContext, action, authorized = false) +return AuthorizationResult.DENIED + } +} + +val allowLiterals = matchingResources( + principalStr, host, op, AclPermissionType.ALLOW, resourceType, PatternType.LITERAL) +val allowPrefixes = matchingResources( + principalStr, host, op, AclPermissionType.ALLOW, resourceType, PatternType.PREFIXED) + +if (allowAny(allowLiterals, allowPrefixes, denyLiterals, denyPrefixes)) { + logAuditMessage(requestContext, action, authorized = true) + return AuthorizationResult.ALLOWED +} + +logAuditMessage(requestContext, action, authorized = false) +AuthorizationResult.DENIED + } + + def matchingResources(principal: String, host: String, op: AclOperation, permission: AclPermissionType, +resourceType: ResourceType, patternType: PatternType): List[immutable.HashSet[String]] = { +var matched = List[immutable.HashSet[String]]() +for (p <- Set(principal, AclEntry.WildcardPrincipalString)) { + for (h <- Set(host, AclEntry.WildcardHost)) { +for (o <- Set(op, AclOperation.ALL)) { + val resourceIndex = new ResourceTypeKey( +new AccessControlEntry(p, h, o, permission), resourceType, patternType) + resourceCache.get(resourceIndex) match { Review comment: commit b6a766b ## File path: core/src/test/scala/unit/kafka/security/authorizer/AuthorizerInterfaceDefaultTest.scala ## @@ -0,0 +1,99 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package kafka.security.authorizer + +import java.util.concurrent.CompletionStage +import java.{lang, util} + +import kafka.server.KafkaConfig +import kafka.utils.TestUtils +import kafka.zk.ZooKeeperTestHarness +import kafka.zookeeper.ZooKeeperClient +import org.apache.kafka.common.Endpoint +import org.apache.kafka.common.acl._ +import org.apache.kafka.common.utils.Time +import org.apache.kafka.server.authorizer._ +import org.junit.{After, Before} + +class AuthorizerInterfaceDefaultTest extends ZooKeeperTestHarness with BaseAuthorizerTest { +
[GitHub] [kafka] d8tltanc commented on a change in pull request #9485: KAKFA-10619: Idempotent producer will get authorized once it has a WRITE access to at least one topic
d8tltanc commented on a change in pull request #9485: URL: https://github.com/apache/kafka/pull/9485#discussion_r545584468 ## File path: core/src/main/scala/kafka/security/authorizer/AclAuthorizer.scala ## @@ -304,6 +309,137 @@ class AclAuthorizer extends Authorizer with Logging { if (zkClient != null) zkClient.close() } + override def authorizeByResourceType(requestContext: AuthorizableRequestContext, + op: AclOperation, + resourceType: ResourceType): AuthorizationResult = { +SecurityUtils.authorizeByResourceTypeCheckArgs(op, resourceType) + +val principal = new KafkaPrincipal( + requestContext.principal().getPrincipalType, + requestContext.principal().getName) + +if (isSuperUser(principal)) + return AuthorizationResult.ALLOWED + +val principalStr = principal.toString + +val host = requestContext.clientAddress().getHostAddress +val action = new Action(op, new ResourcePattern(resourceType, "NONE", PatternType.UNKNOWN), 0, true, true) + +val denyLiterals = matchingResources( + principalStr, host, op, AclPermissionType.DENY, resourceType, PatternType.LITERAL) + +if (denyAll(denyLiterals)) { + logAuditMessage(requestContext, action, authorized = false) + return AuthorizationResult.DENIED +} + +if (shouldAllowEveryoneIfNoAclIsFound) { + logAuditMessage(requestContext, action, authorized = true) + return AuthorizationResult.ALLOWED +} + +val denyPrefixes = matchingResources( + principalStr, host, op, AclPermissionType.DENY, resourceType, PatternType.PREFIXED) + +if (denyLiterals.isEmpty && denyPrefixes.isEmpty) { + if (hasMatchingResources(principalStr, host, op, AclPermissionType.ALLOW, resourceType, PatternType.PREFIXED) + || hasMatchingResources(principalStr, host, op, AclPermissionType.ALLOW, resourceType, PatternType.LITERAL)) { +logAuditMessage(requestContext, action, authorized = true) +return AuthorizationResult.ALLOWED + } else { +logAuditMessage(requestContext, action, authorized = false) +return AuthorizationResult.DENIED + } +} + +val allowLiterals = matchingResources( + principalStr, host, op, AclPermissionType.ALLOW, resourceType, PatternType.LITERAL) +val allowPrefixes = matchingResources( + principalStr, host, op, AclPermissionType.ALLOW, resourceType, PatternType.PREFIXED) + +if (allowAny(allowLiterals, allowPrefixes, denyLiterals, denyPrefixes)) { + logAuditMessage(requestContext, action, authorized = true) + return AuthorizationResult.ALLOWED +} + +logAuditMessage(requestContext, action, authorized = false) +AuthorizationResult.DENIED + } + + def matchingResources(principal: String, host: String, op: AclOperation, permission: AclPermissionType, +resourceType: ResourceType, patternType: PatternType): List[immutable.HashSet[String]] = { +var matched = List[immutable.HashSet[String]]() +for (p <- Set(principal, AclEntry.WildcardPrincipalString)) { + for (h <- Set(host, AclEntry.WildcardHost)) { +for (o <- Set(op, AclOperation.ALL)) { + val resourceIndex = new ResourceTypeKey( +new AccessControlEntry(p, h, o, permission), resourceType, patternType) + resourceCache.get(resourceIndex) match { +case Some(resources) => matched = matched :+ resources +case None => + } +} + } +} +matched + } + + def hasMatchingResources(principal: String, host: String, op: AclOperation, permission: AclPermissionType, + resourceType: ResourceType, patternType: PatternType): Boolean = { +for (p <- Set(principal, AclEntry.WildcardPrincipalString)) { + for (h <- Set(host, AclEntry.WildcardHost)) { +for (o <- Set(op, AclOperation.ALL)) { + val resourceIndex = new ResourceTypeKey( +new AccessControlEntry(p, h, o, permission), resourceType, patternType) + resourceCache.get(resourceIndex) match { +case Some(_) => return true +case None => + } +} + } +} +false + } + + private def denyAll(denyLiterals: List[immutable.HashSet[String]]): Boolean = +denyLiterals.exists(r => r.contains(ResourcePattern.WILDCARD_RESOURCE)) + + + private def allowAny(allowLiterals: List[immutable.Set[String]], allowPrefixes: List[immutable.Set[String]], + denyLiterals: List[immutable.Set[String]], denyPrefixes: List[immutable.Set[String]]): Boolean = { +(allowPrefixes.exists(prefixes => + prefixes.exists(prefix => allowPrefix(prefix, denyPrefixes))) + || allowLiterals.exists(literals => +literals.exists(literal => allowLiteral(literal, denyLiterals, denyPrefixes + } + + private def allowLiteral(literalName:
[GitHub] [kafka] d8tltanc commented on a change in pull request #9485: KAKFA-10619: Idempotent producer will get authorized once it has a WRITE access to at least one topic
d8tltanc commented on a change in pull request #9485: URL: https://github.com/apache/kafka/pull/9485#discussion_r545581171 ## File path: core/src/main/scala/kafka/security/authorizer/AclAuthorizer.scala ## @@ -304,6 +309,137 @@ class AclAuthorizer extends Authorizer with Logging { if (zkClient != null) zkClient.close() } + override def authorizeByResourceType(requestContext: AuthorizableRequestContext, + op: AclOperation, + resourceType: ResourceType): AuthorizationResult = { +SecurityUtils.authorizeByResourceTypeCheckArgs(op, resourceType) + +val principal = new KafkaPrincipal( + requestContext.principal().getPrincipalType, + requestContext.principal().getName) + +if (isSuperUser(principal)) + return AuthorizationResult.ALLOWED + +val principalStr = principal.toString + +val host = requestContext.clientAddress().getHostAddress +val action = new Action(op, new ResourcePattern(resourceType, "NONE", PatternType.UNKNOWN), 0, true, true) + +val denyLiterals = matchingResources( + principalStr, host, op, AclPermissionType.DENY, resourceType, PatternType.LITERAL) + +if (denyAll(denyLiterals)) { + logAuditMessage(requestContext, action, authorized = false) + return AuthorizationResult.DENIED +} + +if (shouldAllowEveryoneIfNoAclIsFound) { + logAuditMessage(requestContext, action, authorized = true) + return AuthorizationResult.ALLOWED +} + +val denyPrefixes = matchingResources( + principalStr, host, op, AclPermissionType.DENY, resourceType, PatternType.PREFIXED) + +if (denyLiterals.isEmpty && denyPrefixes.isEmpty) { + if (hasMatchingResources(principalStr, host, op, AclPermissionType.ALLOW, resourceType, PatternType.PREFIXED) + || hasMatchingResources(principalStr, host, op, AclPermissionType.ALLOW, resourceType, PatternType.LITERAL)) { +logAuditMessage(requestContext, action, authorized = true) +return AuthorizationResult.ALLOWED + } else { +logAuditMessage(requestContext, action, authorized = false) +return AuthorizationResult.DENIED + } +} + +val allowLiterals = matchingResources( + principalStr, host, op, AclPermissionType.ALLOW, resourceType, PatternType.LITERAL) +val allowPrefixes = matchingResources( + principalStr, host, op, AclPermissionType.ALLOW, resourceType, PatternType.PREFIXED) + +if (allowAny(allowLiterals, allowPrefixes, denyLiterals, denyPrefixes)) { + logAuditMessage(requestContext, action, authorized = true) + return AuthorizationResult.ALLOWED +} + +logAuditMessage(requestContext, action, authorized = false) +AuthorizationResult.DENIED + } + + def matchingResources(principal: String, host: String, op: AclOperation, permission: AclPermissionType, +resourceType: ResourceType, patternType: PatternType): List[immutable.HashSet[String]] = { +var matched = List[immutable.HashSet[String]]() +for (p <- Set(principal, AclEntry.WildcardPrincipalString)) { + for (h <- Set(host, AclEntry.WildcardHost)) { +for (o <- Set(op, AclOperation.ALL)) { + val resourceIndex = new ResourceTypeKey( +new AccessControlEntry(p, h, o, permission), resourceType, patternType) + resourceCache.get(resourceIndex) match { +case Some(resources) => matched = matched :+ resources +case None => + } +} + } +} +matched + } + + def hasMatchingResources(principal: String, host: String, op: AclOperation, permission: AclPermissionType, + resourceType: ResourceType, patternType: PatternType): Boolean = { +for (p <- Set(principal, AclEntry.WildcardPrincipalString)) { + for (h <- Set(host, AclEntry.WildcardHost)) { +for (o <- Set(op, AclOperation.ALL)) { + val resourceIndex = new ResourceTypeKey( +new AccessControlEntry(p, h, o, permission), resourceType, patternType) + resourceCache.get(resourceIndex) match { +case Some(_) => return true +case None => + } +} + } +} +false + } + + private def denyAll(denyLiterals: List[immutable.HashSet[String]]): Boolean = +denyLiterals.exists(r => r.contains(ResourcePattern.WILDCARD_RESOURCE)) + + + private def allowAny(allowLiterals: List[immutable.Set[String]], allowPrefixes: List[immutable.Set[String]], + denyLiterals: List[immutable.Set[String]], denyPrefixes: List[immutable.Set[String]]): Boolean = { +(allowPrefixes.exists(prefixes => + prefixes.exists(prefix => allowPrefix(prefix, denyPrefixes))) + || allowLiterals.exists(literals => +literals.exists(literal => allowLiteral(literal, denyLiterals, denyPrefixes + } + + private def allowLiteral(literalName:
[GitHub] [kafka] d8tltanc commented on a change in pull request #9485: KAKFA-10619: Idempotent producer will get authorized once it has a WRITE access to at least one topic
d8tltanc commented on a change in pull request #9485: URL: https://github.com/apache/kafka/pull/9485#discussion_r545579837 ## File path: core/src/main/scala/kafka/security/authorizer/AclAuthorizer.scala ## @@ -304,6 +309,137 @@ class AclAuthorizer extends Authorizer with Logging { if (zkClient != null) zkClient.close() } + override def authorizeByResourceType(requestContext: AuthorizableRequestContext, + op: AclOperation, + resourceType: ResourceType): AuthorizationResult = { +SecurityUtils.authorizeByResourceTypeCheckArgs(op, resourceType) + +val principal = new KafkaPrincipal( + requestContext.principal().getPrincipalType, + requestContext.principal().getName) + +if (isSuperUser(principal)) + return AuthorizationResult.ALLOWED + +val principalStr = principal.toString + +val host = requestContext.clientAddress().getHostAddress +val action = new Action(op, new ResourcePattern(resourceType, "NONE", PatternType.UNKNOWN), 0, true, true) + +val denyLiterals = matchingResources( + principalStr, host, op, AclPermissionType.DENY, resourceType, PatternType.LITERAL) + +if (denyAll(denyLiterals)) { + logAuditMessage(requestContext, action, authorized = false) + return AuthorizationResult.DENIED +} + +if (shouldAllowEveryoneIfNoAclIsFound) { + logAuditMessage(requestContext, action, authorized = true) + return AuthorizationResult.ALLOWED +} + +val denyPrefixes = matchingResources( + principalStr, host, op, AclPermissionType.DENY, resourceType, PatternType.PREFIXED) + +if (denyLiterals.isEmpty && denyPrefixes.isEmpty) { + if (hasMatchingResources(principalStr, host, op, AclPermissionType.ALLOW, resourceType, PatternType.PREFIXED) + || hasMatchingResources(principalStr, host, op, AclPermissionType.ALLOW, resourceType, PatternType.LITERAL)) { +logAuditMessage(requestContext, action, authorized = true) +return AuthorizationResult.ALLOWED + } else { +logAuditMessage(requestContext, action, authorized = false) +return AuthorizationResult.DENIED + } +} + +val allowLiterals = matchingResources( + principalStr, host, op, AclPermissionType.ALLOW, resourceType, PatternType.LITERAL) +val allowPrefixes = matchingResources( + principalStr, host, op, AclPermissionType.ALLOW, resourceType, PatternType.PREFIXED) + +if (allowAny(allowLiterals, allowPrefixes, denyLiterals, denyPrefixes)) { + logAuditMessage(requestContext, action, authorized = true) + return AuthorizationResult.ALLOWED +} + +logAuditMessage(requestContext, action, authorized = false) +AuthorizationResult.DENIED + } + + def matchingResources(principal: String, host: String, op: AclOperation, permission: AclPermissionType, +resourceType: ResourceType, patternType: PatternType): List[immutable.HashSet[String]] = { +var matched = List[immutable.HashSet[String]]() +for (p <- Set(principal, AclEntry.WildcardPrincipalString)) { + for (h <- Set(host, AclEntry.WildcardHost)) { +for (o <- Set(op, AclOperation.ALL)) { + val resourceIndex = new ResourceTypeKey( +new AccessControlEntry(p, h, o, permission), resourceType, patternType) + resourceCache.get(resourceIndex) match { +case Some(resources) => matched = matched :+ resources +case None => + } +} + } +} +matched + } + + def hasMatchingResources(principal: String, host: String, op: AclOperation, permission: AclPermissionType, + resourceType: ResourceType, patternType: PatternType): Boolean = { +for (p <- Set(principal, AclEntry.WildcardPrincipalString)) { + for (h <- Set(host, AclEntry.WildcardHost)) { +for (o <- Set(op, AclOperation.ALL)) { + val resourceIndex = new ResourceTypeKey( +new AccessControlEntry(p, h, o, permission), resourceType, patternType) + resourceCache.get(resourceIndex) match { +case Some(_) => return true +case None => + } +} + } +} +false + } + + private def denyAll(denyLiterals: List[immutable.HashSet[String]]): Boolean = +denyLiterals.exists(r => r.contains(ResourcePattern.WILDCARD_RESOURCE)) + + + private def allowAny(allowLiterals: List[immutable.Set[String]], allowPrefixes: List[immutable.Set[String]], + denyLiterals: List[immutable.Set[String]], denyPrefixes: List[immutable.Set[String]]): Boolean = { +(allowPrefixes.exists(prefixes => Review comment: commit b6a766b This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific
[GitHub] [kafka] d8tltanc commented on a change in pull request #9485: KAKFA-10619: Idempotent producer will get authorized once it has a WRITE access to at least one topic
d8tltanc commented on a change in pull request #9485: URL: https://github.com/apache/kafka/pull/9485#discussion_r545578734 ## File path: core/src/main/scala/kafka/security/authorizer/AclAuthorizer.scala ## @@ -304,6 +309,137 @@ class AclAuthorizer extends Authorizer with Logging { if (zkClient != null) zkClient.close() } + override def authorizeByResourceType(requestContext: AuthorizableRequestContext, + op: AclOperation, + resourceType: ResourceType): AuthorizationResult = { +SecurityUtils.authorizeByResourceTypeCheckArgs(op, resourceType) + +val principal = new KafkaPrincipal( + requestContext.principal().getPrincipalType, + requestContext.principal().getName) + +if (isSuperUser(principal)) + return AuthorizationResult.ALLOWED + +val principalStr = principal.toString + +val host = requestContext.clientAddress().getHostAddress +val action = new Action(op, new ResourcePattern(resourceType, "NONE", PatternType.UNKNOWN), 0, true, true) + +val denyLiterals = matchingResources( + principalStr, host, op, AclPermissionType.DENY, resourceType, PatternType.LITERAL) + +if (denyAll(denyLiterals)) { + logAuditMessage(requestContext, action, authorized = false) + return AuthorizationResult.DENIED +} + +if (shouldAllowEveryoneIfNoAclIsFound) { + logAuditMessage(requestContext, action, authorized = true) + return AuthorizationResult.ALLOWED +} + +val denyPrefixes = matchingResources( + principalStr, host, op, AclPermissionType.DENY, resourceType, PatternType.PREFIXED) + +if (denyLiterals.isEmpty && denyPrefixes.isEmpty) { + if (hasMatchingResources(principalStr, host, op, AclPermissionType.ALLOW, resourceType, PatternType.PREFIXED) + || hasMatchingResources(principalStr, host, op, AclPermissionType.ALLOW, resourceType, PatternType.LITERAL)) { +logAuditMessage(requestContext, action, authorized = true) +return AuthorizationResult.ALLOWED + } else { +logAuditMessage(requestContext, action, authorized = false) +return AuthorizationResult.DENIED + } +} + +val allowLiterals = matchingResources( + principalStr, host, op, AclPermissionType.ALLOW, resourceType, PatternType.LITERAL) +val allowPrefixes = matchingResources( + principalStr, host, op, AclPermissionType.ALLOW, resourceType, PatternType.PREFIXED) + +if (allowAny(allowLiterals, allowPrefixes, denyLiterals, denyPrefixes)) { + logAuditMessage(requestContext, action, authorized = true) + return AuthorizationResult.ALLOWED +} + +logAuditMessage(requestContext, action, authorized = false) +AuthorizationResult.DENIED + } + + def matchingResources(principal: String, host: String, op: AclOperation, permission: AclPermissionType, +resourceType: ResourceType, patternType: PatternType): List[immutable.HashSet[String]] = { +var matched = List[immutable.HashSet[String]]() +for (p <- Set(principal, AclEntry.WildcardPrincipalString)) { + for (h <- Set(host, AclEntry.WildcardHost)) { +for (o <- Set(op, AclOperation.ALL)) { + val resourceIndex = new ResourceTypeKey( +new AccessControlEntry(p, h, o, permission), resourceType, patternType) + resourceCache.get(resourceIndex) match { +case Some(resources) => matched = matched :+ resources +case None => + } +} + } +} +matched + } + + def hasMatchingResources(principal: String, host: String, op: AclOperation, permission: AclPermissionType, + resourceType: ResourceType, patternType: PatternType): Boolean = { +for (p <- Set(principal, AclEntry.WildcardPrincipalString)) { + for (h <- Set(host, AclEntry.WildcardHost)) { +for (o <- Set(op, AclOperation.ALL)) { + val resourceIndex = new ResourceTypeKey( +new AccessControlEntry(p, h, o, permission), resourceType, patternType) + resourceCache.get(resourceIndex) match { +case Some(_) => return true +case None => + } +} + } +} +false + } + + private def denyAll(denyLiterals: List[immutable.HashSet[String]]): Boolean = +denyLiterals.exists(r => r.contains(ResourcePattern.WILDCARD_RESOURCE)) Review comment: Yes. Didn't realize the existence of this syntax be4. Thanks. commit b6a766b This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] d8tltanc commented on a change in pull request #9485: KAKFA-10619: Idempotent producer will get authorized once it has a WRITE access to at least one topic
d8tltanc commented on a change in pull request #9485: URL: https://github.com/apache/kafka/pull/9485#discussion_r545578559 ## File path: core/src/main/scala/kafka/security/authorizer/AclAuthorizer.scala ## @@ -304,6 +309,137 @@ class AclAuthorizer extends Authorizer with Logging { if (zkClient != null) zkClient.close() } + override def authorizeByResourceType(requestContext: AuthorizableRequestContext, + op: AclOperation, + resourceType: ResourceType): AuthorizationResult = { +SecurityUtils.authorizeByResourceTypeCheckArgs(op, resourceType) + +val principal = new KafkaPrincipal( + requestContext.principal().getPrincipalType, + requestContext.principal().getName) + +if (isSuperUser(principal)) + return AuthorizationResult.ALLOWED + +val principalStr = principal.toString + +val host = requestContext.clientAddress().getHostAddress +val action = new Action(op, new ResourcePattern(resourceType, "NONE", PatternType.UNKNOWN), 0, true, true) + +val denyLiterals = matchingResources( + principalStr, host, op, AclPermissionType.DENY, resourceType, PatternType.LITERAL) + +if (denyAll(denyLiterals)) { + logAuditMessage(requestContext, action, authorized = false) + return AuthorizationResult.DENIED +} + +if (shouldAllowEveryoneIfNoAclIsFound) { + logAuditMessage(requestContext, action, authorized = true) + return AuthorizationResult.ALLOWED +} + +val denyPrefixes = matchingResources( + principalStr, host, op, AclPermissionType.DENY, resourceType, PatternType.PREFIXED) + +if (denyLiterals.isEmpty && denyPrefixes.isEmpty) { + if (hasMatchingResources(principalStr, host, op, AclPermissionType.ALLOW, resourceType, PatternType.PREFIXED) + || hasMatchingResources(principalStr, host, op, AclPermissionType.ALLOW, resourceType, PatternType.LITERAL)) { +logAuditMessage(requestContext, action, authorized = true) +return AuthorizationResult.ALLOWED + } else { +logAuditMessage(requestContext, action, authorized = false) +return AuthorizationResult.DENIED + } +} + +val allowLiterals = matchingResources( + principalStr, host, op, AclPermissionType.ALLOW, resourceType, PatternType.LITERAL) +val allowPrefixes = matchingResources( + principalStr, host, op, AclPermissionType.ALLOW, resourceType, PatternType.PREFIXED) + +if (allowAny(allowLiterals, allowPrefixes, denyLiterals, denyPrefixes)) { + logAuditMessage(requestContext, action, authorized = true) + return AuthorizationResult.ALLOWED +} + +logAuditMessage(requestContext, action, authorized = false) +AuthorizationResult.DENIED + } + + def matchingResources(principal: String, host: String, op: AclOperation, permission: AclPermissionType, +resourceType: ResourceType, patternType: PatternType): List[immutable.HashSet[String]] = { +var matched = List[immutable.HashSet[String]]() +for (p <- Set(principal, AclEntry.WildcardPrincipalString)) { + for (h <- Set(host, AclEntry.WildcardHost)) { +for (o <- Set(op, AclOperation.ALL)) { + val resourceIndex = new ResourceTypeKey( +new AccessControlEntry(p, h, o, permission), resourceType, patternType) + resourceCache.get(resourceIndex) match { +case Some(resources) => matched = matched :+ resources +case None => + } +} + } +} +matched + } + + def hasMatchingResources(principal: String, host: String, op: AclOperation, permission: AclPermissionType, + resourceType: ResourceType, patternType: PatternType): Boolean = { +for (p <- Set(principal, AclEntry.WildcardPrincipalString)) { + for (h <- Set(host, AclEntry.WildcardHost)) { +for (o <- Set(op, AclOperation.ALL)) { Review comment: commit b6a766b This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] d8tltanc commented on a change in pull request #9485: KAKFA-10619: Idempotent producer will get authorized once it has a WRITE access to at least one topic
d8tltanc commented on a change in pull request #9485: URL: https://github.com/apache/kafka/pull/9485#discussion_r545578070 ## File path: core/src/main/scala/kafka/security/authorizer/AclAuthorizer.scala ## @@ -304,6 +309,137 @@ class AclAuthorizer extends Authorizer with Logging { if (zkClient != null) zkClient.close() } + override def authorizeByResourceType(requestContext: AuthorizableRequestContext, + op: AclOperation, + resourceType: ResourceType): AuthorizationResult = { +SecurityUtils.authorizeByResourceTypeCheckArgs(op, resourceType) + +val principal = new KafkaPrincipal( + requestContext.principal().getPrincipalType, + requestContext.principal().getName) + +if (isSuperUser(principal)) + return AuthorizationResult.ALLOWED + +val principalStr = principal.toString + +val host = requestContext.clientAddress().getHostAddress +val action = new Action(op, new ResourcePattern(resourceType, "NONE", PatternType.UNKNOWN), 0, true, true) + +val denyLiterals = matchingResources( + principalStr, host, op, AclPermissionType.DENY, resourceType, PatternType.LITERAL) + +if (denyAll(denyLiterals)) { + logAuditMessage(requestContext, action, authorized = false) + return AuthorizationResult.DENIED +} + +if (shouldAllowEveryoneIfNoAclIsFound) { + logAuditMessage(requestContext, action, authorized = true) + return AuthorizationResult.ALLOWED +} + +val denyPrefixes = matchingResources( + principalStr, host, op, AclPermissionType.DENY, resourceType, PatternType.PREFIXED) + +if (denyLiterals.isEmpty && denyPrefixes.isEmpty) { + if (hasMatchingResources(principalStr, host, op, AclPermissionType.ALLOW, resourceType, PatternType.PREFIXED) + || hasMatchingResources(principalStr, host, op, AclPermissionType.ALLOW, resourceType, PatternType.LITERAL)) { +logAuditMessage(requestContext, action, authorized = true) +return AuthorizationResult.ALLOWED + } else { +logAuditMessage(requestContext, action, authorized = false) +return AuthorizationResult.DENIED + } +} + +val allowLiterals = matchingResources( + principalStr, host, op, AclPermissionType.ALLOW, resourceType, PatternType.LITERAL) +val allowPrefixes = matchingResources( + principalStr, host, op, AclPermissionType.ALLOW, resourceType, PatternType.PREFIXED) + +if (allowAny(allowLiterals, allowPrefixes, denyLiterals, denyPrefixes)) { + logAuditMessage(requestContext, action, authorized = true) + return AuthorizationResult.ALLOWED +} + +logAuditMessage(requestContext, action, authorized = false) +AuthorizationResult.DENIED + } + + def matchingResources(principal: String, host: String, op: AclOperation, permission: AclPermissionType, +resourceType: ResourceType, patternType: PatternType): List[immutable.HashSet[String]] = { +var matched = List[immutable.HashSet[String]]() Review comment: Right, though it's only a list of 8. commit b6a766b This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] d8tltanc commented on a change in pull request #9485: KAKFA-10619: Idempotent producer will get authorized once it has a WRITE access to at least one topic
d8tltanc commented on a change in pull request #9485: URL: https://github.com/apache/kafka/pull/9485#discussion_r545577952 ## File path: core/src/main/scala/kafka/security/authorizer/AclAuthorizer.scala ## @@ -304,6 +309,137 @@ class AclAuthorizer extends Authorizer with Logging { if (zkClient != null) zkClient.close() } + override def authorizeByResourceType(requestContext: AuthorizableRequestContext, + op: AclOperation, + resourceType: ResourceType): AuthorizationResult = { +SecurityUtils.authorizeByResourceTypeCheckArgs(op, resourceType) + +val principal = new KafkaPrincipal( + requestContext.principal().getPrincipalType, + requestContext.principal().getName) + +if (isSuperUser(principal)) + return AuthorizationResult.ALLOWED + +val principalStr = principal.toString + +val host = requestContext.clientAddress().getHostAddress +val action = new Action(op, new ResourcePattern(resourceType, "NONE", PatternType.UNKNOWN), 0, true, true) + +val denyLiterals = matchingResources( + principalStr, host, op, AclPermissionType.DENY, resourceType, PatternType.LITERAL) + +if (denyAll(denyLiterals)) { + logAuditMessage(requestContext, action, authorized = false) + return AuthorizationResult.DENIED +} + +if (shouldAllowEveryoneIfNoAclIsFound) { + logAuditMessage(requestContext, action, authorized = true) + return AuthorizationResult.ALLOWED +} + +val denyPrefixes = matchingResources( + principalStr, host, op, AclPermissionType.DENY, resourceType, PatternType.PREFIXED) + +if (denyLiterals.isEmpty && denyPrefixes.isEmpty) { + if (hasMatchingResources(principalStr, host, op, AclPermissionType.ALLOW, resourceType, PatternType.PREFIXED) + || hasMatchingResources(principalStr, host, op, AclPermissionType.ALLOW, resourceType, PatternType.LITERAL)) { +logAuditMessage(requestContext, action, authorized = true) +return AuthorizationResult.ALLOWED + } else { +logAuditMessage(requestContext, action, authorized = false) +return AuthorizationResult.DENIED + } +} + +val allowLiterals = matchingResources( + principalStr, host, op, AclPermissionType.ALLOW, resourceType, PatternType.LITERAL) +val allowPrefixes = matchingResources( + principalStr, host, op, AclPermissionType.ALLOW, resourceType, PatternType.PREFIXED) + +if (allowAny(allowLiterals, allowPrefixes, denyLiterals, denyPrefixes)) { + logAuditMessage(requestContext, action, authorized = true) + return AuthorizationResult.ALLOWED +} + +logAuditMessage(requestContext, action, authorized = false) +AuthorizationResult.DENIED + } + + def matchingResources(principal: String, host: String, op: AclOperation, permission: AclPermissionType, +resourceType: ResourceType, patternType: PatternType): List[immutable.HashSet[String]] = { +var matched = List[immutable.HashSet[String]]() +for (p <- Set(principal, AclEntry.WildcardPrincipalString)) { + for (h <- Set(host, AclEntry.WildcardHost)) { +for (o <- Set(op, AclOperation.ALL)) { Review comment: commit b6a766b This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] d8tltanc commented on a change in pull request #9485: KAKFA-10619: Idempotent producer will get authorized once it has a WRITE access to at least one topic
d8tltanc commented on a change in pull request #9485: URL: https://github.com/apache/kafka/pull/9485#discussion_r545577832 ## File path: clients/src/main/java/org/apache/kafka/server/authorizer/Authorizer.java ## @@ -139,4 +152,133 @@ * @return Iterator for ACL bindings, which may be populated lazily. */ Iterable acls(AclBindingFilter filter); + +/** + * Check if the caller is authorized to perform the given ACL operation on at least one + * resource of the given type. + * + * It is important to override this interface default in implementations because + * 1. The interface default iterates all AclBindings multiple times, without any indexing, + *which is a CPU intense work. + * 2. The interface default rebuild several sets of strings, which is a memory intense work. + * 3. The interface default cannot perform the audit logging properly + * + * @param requestContext Request context including request resourceType, security protocol, and listener name + * @param op The ACL operation to check + * @param resourceType The resource type to check + * @return Return {@link AuthorizationResult#ALLOWED} if the caller is authorized to perform the + * given ACL operation on at least one resource of the given type. + * Return {@link AuthorizationResult#DENIED} otherwise. + */ +default AuthorizationResult authorizeByResourceType(AuthorizableRequestContext requestContext, AclOperation op, ResourceType resourceType) { +SecurityUtils.authorizeByResourceTypeCheckArgs(op, resourceType); + +if (authorize(requestContext, Collections.singletonList(new Action( Review comment: commit b6a766b ## File path: clients/src/main/java/org/apache/kafka/server/authorizer/Authorizer.java ## @@ -139,4 +152,133 @@ * @return Iterator for ACL bindings, which may be populated lazily. */ Iterable acls(AclBindingFilter filter); + +/** + * Check if the caller is authorized to perform the given ACL operation on at least one + * resource of the given type. + * + * It is important to override this interface default in implementations because + * 1. The interface default iterates all AclBindings multiple times, without any indexing, + *which is a CPU intense work. + * 2. The interface default rebuild several sets of strings, which is a memory intense work. + * 3. The interface default cannot perform the audit logging properly + * + * @param requestContext Request context including request resourceType, security protocol, and listener name + * @param op The ACL operation to check + * @param resourceType The resource type to check + * @return Return {@link AuthorizationResult#ALLOWED} if the caller is authorized to perform the + * given ACL operation on at least one resource of the given type. + * Return {@link AuthorizationResult#DENIED} otherwise. + */ +default AuthorizationResult authorizeByResourceType(AuthorizableRequestContext requestContext, AclOperation op, ResourceType resourceType) { +SecurityUtils.authorizeByResourceTypeCheckArgs(op, resourceType); + +if (authorize(requestContext, Collections.singletonList(new Action( +op, new ResourcePattern(resourceType, "hardcode", PatternType.LITERAL), +0, true, false))) +.get(0) == AuthorizationResult.ALLOWED) { +return AuthorizationResult.ALLOWED; +} + +// Filter out all the resource pattern corresponding to the RequestContext, +// AclOperation, and ResourceType +ResourcePatternFilter resourceTypeFilter = new ResourcePatternFilter( +resourceType, null, PatternType.ANY); +AclBindingFilter aclFilter = new AclBindingFilter( +resourceTypeFilter, AccessControlEntryFilter.ANY); + +EnumMap> denyPatterns = +new EnumMap>(PatternType.class) {{ +put(PatternType.LITERAL, new HashSet<>()); +put(PatternType.PREFIXED, new HashSet<>()); +}}; +EnumMap> allowPatterns = +new EnumMap>(PatternType.class) {{ +put(PatternType.LITERAL, new HashSet<>()); +put(PatternType.PREFIXED, new HashSet<>()); +}}; + +boolean hasWildCardAllow = false; + +KafkaPrincipal principal = new KafkaPrincipal( +requestContext.principal().getPrincipalType(), +requestContext.principal().getName()); +String hostAddr = requestContext.clientAddress().getHostAddress(); + +for (AclBinding binding : acls(aclFilter)) { +if (!binding.entry().host().equals(hostAddr) && !binding.entry().host().equals("*")) +
[GitHub] [kafka] d8tltanc commented on a change in pull request #9485: KAKFA-10619: Idempotent producer will get authorized once it has a WRITE access to at least one topic
d8tltanc commented on a change in pull request #9485: URL: https://github.com/apache/kafka/pull/9485#discussion_r54552 ## File path: clients/src/main/java/org/apache/kafka/server/authorizer/Authorizer.java ## @@ -139,4 +152,133 @@ * @return Iterator for ACL bindings, which may be populated lazily. */ Iterable acls(AclBindingFilter filter); + +/** + * Check if the caller is authorized to perform the given ACL operation on at least one + * resource of the given type. + * + * It is important to override this interface default in implementations because + * 1. The interface default iterates all AclBindings multiple times, without any indexing, + *which is a CPU intense work. + * 2. The interface default rebuild several sets of strings, which is a memory intense work. + * 3. The interface default cannot perform the audit logging properly + * + * @param requestContext Request context including request resourceType, security protocol, and listener name + * @param op The ACL operation to check + * @param resourceType The resource type to check + * @return Return {@link AuthorizationResult#ALLOWED} if the caller is authorized to perform the + * given ACL operation on at least one resource of the given type. + * Return {@link AuthorizationResult#DENIED} otherwise. + */ +default AuthorizationResult authorizeByResourceType(AuthorizableRequestContext requestContext, AclOperation op, ResourceType resourceType) { +SecurityUtils.authorizeByResourceTypeCheckArgs(op, resourceType); + +if (authorize(requestContext, Collections.singletonList(new Action( Review comment: // Check a hard-coded name to ensure that super users are granted // access regardless of DENY ACLs. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] d8tltanc commented on a change in pull request #9485: KAKFA-10619: Idempotent producer will get authorized once it has a WRITE access to at least one topic
d8tltanc commented on a change in pull request #9485: URL: https://github.com/apache/kafka/pull/9485#discussion_r545576808 ## File path: clients/src/main/java/org/apache/kafka/server/authorizer/Authorizer.java ## @@ -139,4 +152,133 @@ * @return Iterator for ACL bindings, which may be populated lazily. */ Iterable acls(AclBindingFilter filter); + +/** + * Check if the caller is authorized to perform the given ACL operation on at least one + * resource of the given type. + * + * It is important to override this interface default in implementations because Review comment: ![image](https://user-images.githubusercontent.com/31675100/102577221-7bc51c00-40ac-11eb-89a1-234d0af3f0fd.png) commit b6a766b This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] d8tltanc commented on a change in pull request #9485: KAKFA-10619: Idempotent producer will get authorized once it has a WRITE access to at least one topic
d8tltanc commented on a change in pull request #9485: URL: https://github.com/apache/kafka/pull/9485#discussion_r545576700 ## File path: clients/src/main/java/org/apache/kafka/common/utils/SecurityUtils.java ## @@ -146,4 +148,32 @@ else if (capitalizeNext) { } return builder.toString(); } + +public static void authorizeByResourceTypeCheckArgs(AclOperation op, +ResourceType type) { +if (type == ResourceType.ANY) { +throw new IllegalArgumentException( +"Must specify a non-filter resource type for authorizeByResourceType"); +} + +if (type == ResourceType.UNKNOWN) { +throw new IllegalArgumentException( +"Unknown resource type"); +} + +if (op == AclOperation.ANY) { +throw new IllegalArgumentException( +"Must specify a non-filter operation type for authorizeByResourceType"); +} + +if (op == AclOperation.UNKNOWN) { +throw new IllegalArgumentException( +"Unknown operation type"); +} +} + +public static boolean canDenyAll(ResourcePattern pattern) { Review comment: commit b6a766b This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] d8tltanc commented on a change in pull request #9485: KAKFA-10619: Idempotent producer will get authorized once it has a WRITE access to at least one topic
d8tltanc commented on a change in pull request #9485: URL: https://github.com/apache/kafka/pull/9485#discussion_r545576579 ## File path: core/src/main/scala/kafka/security/authorizer/AclAuthorizer.scala ## @@ -547,7 +683,37 @@ class AclAuthorizer extends Authorizer with Logging { zkClient.getVersionedAclsForResource(resource) } - private def updateCache(resource: ResourcePattern, versionedAcls: VersionedAcls): Unit = { + // Visible for benchmark + def updateCache(resource: ResourcePattern, versionedAcls: VersionedAcls): Unit = { +val currentAces: Set[AccessControlEntry] = aclCache.get(resource) match { + case Some(versionedAcls) => versionedAcls.acls.map(aclEntry => aclEntry.ace) + case None => Set.empty +} +val newAces: Set[AccessControlEntry] = versionedAcls.acls.map(aclEntry => aclEntry.ace) +val acesToAdd = newAces.diff(currentAces) +val acesToRemove = currentAces.diff(newAces) + +acesToAdd.foreach(ace => { + val resourceIndex = new ResourceTypeKey(ace, resource.resourceType(), resource.patternType()) + resourceCache.get(resourceIndex) match { +case Some(resources) => resourceCache += (resourceIndex -> (resources + resource.name())) +case None => resourceCache += (resourceIndex -> immutable.HashSet(resource.name())) + } +}) +acesToRemove.foreach(ace => { + val resourceIndex = new ResourceTypeKey(ace, resource.resourceType(), resource.patternType()) Review comment: commit b6a766b This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] d8tltanc commented on a change in pull request #9485: KAKFA-10619: Idempotent producer will get authorized once it has a WRITE access to at least one topic
d8tltanc commented on a change in pull request #9485: URL: https://github.com/apache/kafka/pull/9485#discussion_r545576394 ## File path: core/src/main/scala/kafka/security/authorizer/AclAuthorizer.scala ## @@ -547,7 +683,37 @@ class AclAuthorizer extends Authorizer with Logging { zkClient.getVersionedAclsForResource(resource) } - private def updateCache(resource: ResourcePattern, versionedAcls: VersionedAcls): Unit = { + // Visible for benchmark + def updateCache(resource: ResourcePattern, versionedAcls: VersionedAcls): Unit = { +val currentAces: Set[AccessControlEntry] = aclCache.get(resource) match { + case Some(versionedAcls) => versionedAcls.acls.map(aclEntry => aclEntry.ace) + case None => Set.empty +} +val newAces: Set[AccessControlEntry] = versionedAcls.acls.map(aclEntry => aclEntry.ace) +val acesToAdd = newAces.diff(currentAces) +val acesToRemove = currentAces.diff(newAces) + +acesToAdd.foreach(ace => { + val resourceIndex = new ResourceTypeKey(ace, resource.resourceType(), resource.patternType()) Review comment: commit b6a766b This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] d8tltanc commented on a change in pull request #9485: KAKFA-10619: Idempotent producer will get authorized once it has a WRITE access to at least one topic
d8tltanc commented on a change in pull request #9485: URL: https://github.com/apache/kafka/pull/9485#discussion_r545576162 ## File path: core/src/main/scala/kafka/security/authorizer/AclAuthorizer.scala ## @@ -547,7 +683,37 @@ class AclAuthorizer extends Authorizer with Logging { zkClient.getVersionedAclsForResource(resource) } - private def updateCache(resource: ResourcePattern, versionedAcls: VersionedAcls): Unit = { + // Visible for benchmark + def updateCache(resource: ResourcePattern, versionedAcls: VersionedAcls): Unit = { +val currentAces: Set[AccessControlEntry] = aclCache.get(resource) match { + case Some(versionedAcls) => versionedAcls.acls.map(aclEntry => aclEntry.ace) + case None => Set.empty Review comment: commit b6a766b This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] d8tltanc commented on a change in pull request #9485: KAKFA-10619: Idempotent producer will get authorized once it has a WRITE access to at least one topic
d8tltanc commented on a change in pull request #9485: URL: https://github.com/apache/kafka/pull/9485#discussion_r545531049 ## File path: core/src/main/scala/kafka/security/authorizer/AclAuthorizer.scala ## @@ -304,6 +309,137 @@ class AclAuthorizer extends Authorizer with Logging { if (zkClient != null) zkClient.close() } + override def authorizeByResourceType(requestContext: AuthorizableRequestContext, + op: AclOperation, + resourceType: ResourceType): AuthorizationResult = { +SecurityUtils.authorizeByResourceTypeCheckArgs(op, resourceType) + +val principal = new KafkaPrincipal( + requestContext.principal().getPrincipalType, + requestContext.principal().getName) + +if (isSuperUser(principal)) + return AuthorizationResult.ALLOWED + +val principalStr = principal.toString + +val host = requestContext.clientAddress().getHostAddress +val action = new Action(op, new ResourcePattern(resourceType, "NONE", PatternType.UNKNOWN), 0, true, true) + +val denyLiterals = matchingResources( + principalStr, host, op, AclPermissionType.DENY, resourceType, PatternType.LITERAL) + +if (denyAll(denyLiterals)) { + logAuditMessage(requestContext, action, authorized = false) + return AuthorizationResult.DENIED +} + +if (shouldAllowEveryoneIfNoAclIsFound) { + logAuditMessage(requestContext, action, authorized = true) + return AuthorizationResult.ALLOWED +} + +val denyPrefixes = matchingResources( + principalStr, host, op, AclPermissionType.DENY, resourceType, PatternType.PREFIXED) + +if (denyLiterals.isEmpty && denyPrefixes.isEmpty) { + if (hasMatchingResources(principalStr, host, op, AclPermissionType.ALLOW, resourceType, PatternType.PREFIXED) + || hasMatchingResources(principalStr, host, op, AclPermissionType.ALLOW, resourceType, PatternType.LITERAL)) { +logAuditMessage(requestContext, action, authorized = true) +return AuthorizationResult.ALLOWED + } else { +logAuditMessage(requestContext, action, authorized = false) +return AuthorizationResult.DENIED + } +} + +val allowLiterals = matchingResources( + principalStr, host, op, AclPermissionType.ALLOW, resourceType, PatternType.LITERAL) +val allowPrefixes = matchingResources( + principalStr, host, op, AclPermissionType.ALLOW, resourceType, PatternType.PREFIXED) + +if (allowAny(allowLiterals, allowPrefixes, denyLiterals, denyPrefixes)) { + logAuditMessage(requestContext, action, authorized = true) + return AuthorizationResult.ALLOWED +} + +logAuditMessage(requestContext, action, authorized = false) +AuthorizationResult.DENIED + } + + def matchingResources(principal: String, host: String, op: AclOperation, permission: AclPermissionType, +resourceType: ResourceType, patternType: PatternType): List[immutable.HashSet[String]] = { +var matched = List[immutable.HashSet[String]]() +for (p <- Set(principal, AclEntry.WildcardPrincipalString)) { + for (h <- Set(host, AclEntry.WildcardHost)) { +for (o <- Set(op, AclOperation.ALL)) { + val resourceIndex = new ResourceTypeKey( +new AccessControlEntry(p, h, o, permission), resourceType, patternType) + resourceCache.get(resourceIndex) match { +case Some(resources) => matched = matched :+ resources +case None => + } +} + } +} +matched + } + + def hasMatchingResources(principal: String, host: String, op: AclOperation, permission: AclPermissionType, Review comment: commit b6a766b ## File path: core/src/main/scala/kafka/security/authorizer/AclAuthorizer.scala ## @@ -304,6 +309,137 @@ class AclAuthorizer extends Authorizer with Logging { if (zkClient != null) zkClient.close() } + override def authorizeByResourceType(requestContext: AuthorizableRequestContext, + op: AclOperation, + resourceType: ResourceType): AuthorizationResult = { +SecurityUtils.authorizeByResourceTypeCheckArgs(op, resourceType) + +val principal = new KafkaPrincipal( + requestContext.principal().getPrincipalType, + requestContext.principal().getName) + +if (isSuperUser(principal)) + return AuthorizationResult.ALLOWED + +val principalStr = principal.toString + +val host = requestContext.clientAddress().getHostAddress +val action = new Action(op, new ResourcePattern(resourceType, "NONE", PatternType.UNKNOWN), 0, true, true) + +val denyLiterals = matchingResources( + principalStr, host, op, AclPermissionType.DENY, resourceType, PatternType.LITERAL) + +if (denyAll(denyLiterals)) { + logAuditMessage(requestContext, action, authorized = false) + return
[GitHub] [kafka] d8tltanc commented on a change in pull request #9485: KAKFA-10619: Idempotent producer will get authorized once it has a WRITE access to at least one topic
d8tltanc commented on a change in pull request #9485: URL: https://github.com/apache/kafka/pull/9485#discussion_r545529534 ## File path: core/src/main/scala/kafka/security/authorizer/AclAuthorizer.scala ## @@ -304,6 +309,137 @@ class AclAuthorizer extends Authorizer with Logging { if (zkClient != null) zkClient.close() } + override def authorizeByResourceType(requestContext: AuthorizableRequestContext, + op: AclOperation, + resourceType: ResourceType): AuthorizationResult = { +SecurityUtils.authorizeByResourceTypeCheckArgs(op, resourceType) + +val principal = new KafkaPrincipal( + requestContext.principal().getPrincipalType, + requestContext.principal().getName) + +if (isSuperUser(principal)) + return AuthorizationResult.ALLOWED + +val principalStr = principal.toString + +val host = requestContext.clientAddress().getHostAddress +val action = new Action(op, new ResourcePattern(resourceType, "NONE", PatternType.UNKNOWN), 0, true, true) + +val denyLiterals = matchingResources( + principalStr, host, op, AclPermissionType.DENY, resourceType, PatternType.LITERAL) + +if (denyAll(denyLiterals)) { + logAuditMessage(requestContext, action, authorized = false) + return AuthorizationResult.DENIED +} + +if (shouldAllowEveryoneIfNoAclIsFound) { + logAuditMessage(requestContext, action, authorized = true) + return AuthorizationResult.ALLOWED +} + +val denyPrefixes = matchingResources( + principalStr, host, op, AclPermissionType.DENY, resourceType, PatternType.PREFIXED) + +if (denyLiterals.isEmpty && denyPrefixes.isEmpty) { + if (hasMatchingResources(principalStr, host, op, AclPermissionType.ALLOW, resourceType, PatternType.PREFIXED) + || hasMatchingResources(principalStr, host, op, AclPermissionType.ALLOW, resourceType, PatternType.LITERAL)) { +logAuditMessage(requestContext, action, authorized = true) +return AuthorizationResult.ALLOWED + } else { +logAuditMessage(requestContext, action, authorized = false) +return AuthorizationResult.DENIED + } +} + +val allowLiterals = matchingResources( + principalStr, host, op, AclPermissionType.ALLOW, resourceType, PatternType.LITERAL) +val allowPrefixes = matchingResources( + principalStr, host, op, AclPermissionType.ALLOW, resourceType, PatternType.PREFIXED) + +if (allowAny(allowLiterals, allowPrefixes, denyLiterals, denyPrefixes)) { + logAuditMessage(requestContext, action, authorized = true) + return AuthorizationResult.ALLOWED +} + +logAuditMessage(requestContext, action, authorized = false) +AuthorizationResult.DENIED + } + + def matchingResources(principal: String, host: String, op: AclOperation, permission: AclPermissionType, +resourceType: ResourceType, patternType: PatternType): List[immutable.HashSet[String]] = { +var matched = List[immutable.HashSet[String]]() +for (p <- Set(principal, AclEntry.WildcardPrincipalString)) { + for (h <- Set(host, AclEntry.WildcardHost)) { +for (o <- Set(op, AclOperation.ALL)) { + val resourceIndex = new ResourceTypeKey( Review comment: commit b6a766b This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] d8tltanc commented on a change in pull request #9485: KAKFA-10619: Idempotent producer will get authorized once it has a WRITE access to at least one topic
d8tltanc commented on a change in pull request #9485: URL: https://github.com/apache/kafka/pull/9485#discussion_r545529391 ## File path: core/src/main/scala/kafka/security/authorizer/AclAuthorizer.scala ## @@ -304,6 +309,137 @@ class AclAuthorizer extends Authorizer with Logging { if (zkClient != null) zkClient.close() } + override def authorizeByResourceType(requestContext: AuthorizableRequestContext, + op: AclOperation, + resourceType: ResourceType): AuthorizationResult = { +SecurityUtils.authorizeByResourceTypeCheckArgs(op, resourceType) + +val principal = new KafkaPrincipal( + requestContext.principal().getPrincipalType, + requestContext.principal().getName) + +if (isSuperUser(principal)) + return AuthorizationResult.ALLOWED + +val principalStr = principal.toString + +val host = requestContext.clientAddress().getHostAddress +val action = new Action(op, new ResourcePattern(resourceType, "NONE", PatternType.UNKNOWN), 0, true, true) + +val denyLiterals = matchingResources( + principalStr, host, op, AclPermissionType.DENY, resourceType, PatternType.LITERAL) + +if (denyAll(denyLiterals)) { + logAuditMessage(requestContext, action, authorized = false) + return AuthorizationResult.DENIED +} + +if (shouldAllowEveryoneIfNoAclIsFound) { + logAuditMessage(requestContext, action, authorized = true) + return AuthorizationResult.ALLOWED +} + +val denyPrefixes = matchingResources( + principalStr, host, op, AclPermissionType.DENY, resourceType, PatternType.PREFIXED) + +if (denyLiterals.isEmpty && denyPrefixes.isEmpty) { + if (hasMatchingResources(principalStr, host, op, AclPermissionType.ALLOW, resourceType, PatternType.PREFIXED) + || hasMatchingResources(principalStr, host, op, AclPermissionType.ALLOW, resourceType, PatternType.LITERAL)) { +logAuditMessage(requestContext, action, authorized = true) +return AuthorizationResult.ALLOWED + } else { +logAuditMessage(requestContext, action, authorized = false) +return AuthorizationResult.DENIED + } +} + +val allowLiterals = matchingResources( + principalStr, host, op, AclPermissionType.ALLOW, resourceType, PatternType.LITERAL) +val allowPrefixes = matchingResources( + principalStr, host, op, AclPermissionType.ALLOW, resourceType, PatternType.PREFIXED) + +if (allowAny(allowLiterals, allowPrefixes, denyLiterals, denyPrefixes)) { + logAuditMessage(requestContext, action, authorized = true) + return AuthorizationResult.ALLOWED +} + +logAuditMessage(requestContext, action, authorized = false) +AuthorizationResult.DENIED + } + + def matchingResources(principal: String, host: String, op: AclOperation, permission: AclPermissionType, +resourceType: ResourceType, patternType: PatternType): List[immutable.HashSet[String]] = { +var matched = List[immutable.HashSet[String]]() +for (p <- Set(principal, AclEntry.WildcardPrincipalString)) { + for (h <- Set(host, AclEntry.WildcardHost)) { +for (o <- Set(op, AclOperation.ALL)) { + val resourceIndex = new ResourceTypeKey( +new AccessControlEntry(p, h, o, permission), resourceType, patternType) + resourceCache.get(resourceIndex) match { +case Some(resources) => matched = matched :+ resources +case None => + } +} + } +} +matched + } + + def hasMatchingResources(principal: String, host: String, op: AclOperation, permission: AclPermissionType, + resourceType: ResourceType, patternType: PatternType): Boolean = { +for (p <- Set(principal, AclEntry.WildcardPrincipalString)) { + for (h <- Set(host, AclEntry.WildcardHost)) { +for (o <- Set(op, AclOperation.ALL)) { + val resourceIndex = new ResourceTypeKey( +new AccessControlEntry(p, h, o, permission), resourceType, patternType) + resourceCache.get(resourceIndex) match { Review comment: resourceCache.contains ## File path: core/src/main/scala/kafka/security/authorizer/AclAuthorizer.scala ## @@ -304,6 +309,137 @@ class AclAuthorizer extends Authorizer with Logging { if (zkClient != null) zkClient.close() } + override def authorizeByResourceType(requestContext: AuthorizableRequestContext, + op: AclOperation, + resourceType: ResourceType): AuthorizationResult = { +SecurityUtils.authorizeByResourceTypeCheckArgs(op, resourceType) + +val principal = new KafkaPrincipal( + requestContext.principal().getPrincipalType, + requestContext.principal().getName) + +if (isSuperUser(principal)) + return AuthorizationResult.ALLOWED + +val principalStr =
[GitHub] [kafka] d8tltanc commented on a change in pull request #9485: KAKFA-10619: Idempotent producer will get authorized once it has a WRITE access to at least one topic
d8tltanc commented on a change in pull request #9485: URL: https://github.com/apache/kafka/pull/9485#discussion_r545529131 ## File path: core/src/main/scala/kafka/security/authorizer/AclAuthorizer.scala ## @@ -304,6 +309,137 @@ class AclAuthorizer extends Authorizer with Logging { if (zkClient != null) zkClient.close() } + override def authorizeByResourceType(requestContext: AuthorizableRequestContext, + op: AclOperation, + resourceType: ResourceType): AuthorizationResult = { +SecurityUtils.authorizeByResourceTypeCheckArgs(op, resourceType) + +val principal = new KafkaPrincipal( + requestContext.principal().getPrincipalType, + requestContext.principal().getName) + +if (isSuperUser(principal)) + return AuthorizationResult.ALLOWED + +val principalStr = principal.toString + +val host = requestContext.clientAddress().getHostAddress +val action = new Action(op, new ResourcePattern(resourceType, "NONE", PatternType.UNKNOWN), 0, true, true) + +val denyLiterals = matchingResources( + principalStr, host, op, AclPermissionType.DENY, resourceType, PatternType.LITERAL) + +if (denyAll(denyLiterals)) { + logAuditMessage(requestContext, action, authorized = false) + return AuthorizationResult.DENIED +} + +if (shouldAllowEveryoneIfNoAclIsFound) { + logAuditMessage(requestContext, action, authorized = true) + return AuthorizationResult.ALLOWED +} + +val denyPrefixes = matchingResources( + principalStr, host, op, AclPermissionType.DENY, resourceType, PatternType.PREFIXED) + +if (denyLiterals.isEmpty && denyPrefixes.isEmpty) { + if (hasMatchingResources(principalStr, host, op, AclPermissionType.ALLOW, resourceType, PatternType.PREFIXED) + || hasMatchingResources(principalStr, host, op, AclPermissionType.ALLOW, resourceType, PatternType.LITERAL)) { +logAuditMessage(requestContext, action, authorized = true) +return AuthorizationResult.ALLOWED + } else { +logAuditMessage(requestContext, action, authorized = false) +return AuthorizationResult.DENIED + } +} + +val allowLiterals = matchingResources( + principalStr, host, op, AclPermissionType.ALLOW, resourceType, PatternType.LITERAL) +val allowPrefixes = matchingResources( + principalStr, host, op, AclPermissionType.ALLOW, resourceType, PatternType.PREFIXED) + +if (allowAny(allowLiterals, allowPrefixes, denyLiterals, denyPrefixes)) { + logAuditMessage(requestContext, action, authorized = true) + return AuthorizationResult.ALLOWED +} + +logAuditMessage(requestContext, action, authorized = false) +AuthorizationResult.DENIED + } + + def matchingResources(principal: String, host: String, op: AclOperation, permission: AclPermissionType, +resourceType: ResourceType, patternType: PatternType): List[immutable.HashSet[String]] = { +var matched = List[immutable.HashSet[String]]() +for (p <- Set(principal, AclEntry.WildcardPrincipalString)) { + for (h <- Set(host, AclEntry.WildcardHost)) { +for (o <- Set(op, AclOperation.ALL)) { + val resourceIndex = new ResourceTypeKey( +new AccessControlEntry(p, h, o, permission), resourceType, patternType) + resourceCache.get(resourceIndex) match { +case Some(resources) => matched = matched :+ resources +case None => + } +} + } +} +matched + } + + def hasMatchingResources(principal: String, host: String, op: AclOperation, permission: AclPermissionType, + resourceType: ResourceType, patternType: PatternType): Boolean = { +for (p <- Set(principal, AclEntry.WildcardPrincipalString)) { + for (h <- Set(host, AclEntry.WildcardHost)) { +for (o <- Set(op, AclOperation.ALL)) { + val resourceIndex = new ResourceTypeKey( Review comment: commit b6a766b228034a442e3a6e8b71ecee78eefdbfd3 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] d8tltanc commented on a change in pull request #9485: KAKFA-10619: Idempotent producer will get authorized once it has a WRITE access to at least one topic
d8tltanc commented on a change in pull request #9485: URL: https://github.com/apache/kafka/pull/9485#discussion_r545527187 ## File path: core/src/test/scala/unit/kafka/security/authorizer/AuthorizerInterfaceDefaultTest.scala ## @@ -0,0 +1,99 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package kafka.security.authorizer + +import java.util.concurrent.CompletionStage +import java.{lang, util} + +import kafka.server.KafkaConfig +import kafka.utils.TestUtils +import kafka.zk.ZooKeeperTestHarness +import kafka.zookeeper.ZooKeeperClient +import org.apache.kafka.common.Endpoint +import org.apache.kafka.common.acl._ +import org.apache.kafka.common.utils.Time +import org.apache.kafka.server.authorizer._ +import org.junit.{After, Before} + +class AuthorizerInterfaceDefaultTest extends ZooKeeperTestHarness with BaseAuthorizerTest { + + private val interfaceDefaultAuthorizer = new DelegateAuthorizer + + override def authorizer: Authorizer = interfaceDefaultAuthorizer + + @Before + override def setUp(): Unit = { +super.setUp() + +val authorizers = Seq(interfaceDefaultAuthorizer.authorizer) + +// Increase maxUpdateRetries to avoid transient failures +authorizers.foreach(a => a.maxUpdateRetries = Int.MaxValue) + +val props = TestUtils.createBrokerConfig(0, zkConnect) +props.put(AclAuthorizer.SuperUsersProp, superUsers) + +config = KafkaConfig.fromProps(props) +authorizers.foreach(a => a.configure(config.originals)) + +zooKeeperClient = new ZooKeeperClient(zkConnect, zkSessionTimeout, zkConnectionTimeout, zkMaxInFlightRequests, Review comment: I think that the ZookeeperClient has a different metric group name. I'm not sure how the name will be used though. And yes, we can do that in a follow-up PR later. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] d8tltanc commented on a change in pull request #9485: KAKFA-10619: Idempotent producer will get authorized once it has a WRITE access to at least one topic
d8tltanc commented on a change in pull request #9485: URL: https://github.com/apache/kafka/pull/9485#discussion_r545525163 ## File path: core/src/main/scala/kafka/security/authorizer/AclAuthorizer.scala ## @@ -304,6 +309,137 @@ class AclAuthorizer extends Authorizer with Logging { if (zkClient != null) zkClient.close() } + override def authorizeByResourceType(requestContext: AuthorizableRequestContext, + op: AclOperation, + resourceType: ResourceType): AuthorizationResult = { +SecurityUtils.authorizeByResourceTypeCheckArgs(op, resourceType) + +val principal = new KafkaPrincipal( + requestContext.principal().getPrincipalType, + requestContext.principal().getName) + +if (isSuperUser(principal)) + return AuthorizationResult.ALLOWED + +val principalStr = principal.toString + +val host = requestContext.clientAddress().getHostAddress +val action = new Action(op, new ResourcePattern(resourceType, "NONE", PatternType.UNKNOWN), 0, true, true) + +val denyLiterals = matchingResources( + principalStr, host, op, AclPermissionType.DENY, resourceType, PatternType.LITERAL) + +if (denyAll(denyLiterals)) { + logAuditMessage(requestContext, action, authorized = false) + return AuthorizationResult.DENIED +} + +if (shouldAllowEveryoneIfNoAclIsFound) { + logAuditMessage(requestContext, action, authorized = true) + return AuthorizationResult.ALLOWED +} + +val denyPrefixes = matchingResources( + principalStr, host, op, AclPermissionType.DENY, resourceType, PatternType.PREFIXED) + +if (denyLiterals.isEmpty && denyPrefixes.isEmpty) { + if (hasMatchingResources(principalStr, host, op, AclPermissionType.ALLOW, resourceType, PatternType.PREFIXED) + || hasMatchingResources(principalStr, host, op, AclPermissionType.ALLOW, resourceType, PatternType.LITERAL)) { +logAuditMessage(requestContext, action, authorized = true) +return AuthorizationResult.ALLOWED + } else { +logAuditMessage(requestContext, action, authorized = false) +return AuthorizationResult.DENIED + } +} + +val allowLiterals = matchingResources( + principalStr, host, op, AclPermissionType.ALLOW, resourceType, PatternType.LITERAL) +val allowPrefixes = matchingResources( + principalStr, host, op, AclPermissionType.ALLOW, resourceType, PatternType.PREFIXED) + +if (allowAny(allowLiterals, allowPrefixes, denyLiterals, denyPrefixes)) { + logAuditMessage(requestContext, action, authorized = true) + return AuthorizationResult.ALLOWED +} + +logAuditMessage(requestContext, action, authorized = false) +AuthorizationResult.DENIED + } + + def matchingResources(principal: String, host: String, op: AclOperation, permission: AclPermissionType, +resourceType: ResourceType, patternType: PatternType): List[immutable.HashSet[String]] = { +var matched = List[immutable.HashSet[String]]() +for (p <- Set(principal, AclEntry.WildcardPrincipalString)) { + for (h <- Set(host, AclEntry.WildcardHost)) { +for (o <- Set(op, AclOperation.ALL)) { + val resourceIndex = new ResourceTypeKey( +new AccessControlEntry(p, h, o, permission), resourceType, patternType) + resourceCache.get(resourceIndex) match { Review comment: Right. To prevent the phantom problems. ## File path: core/src/main/scala/kafka/security/authorizer/AclAuthorizer.scala ## @@ -304,6 +309,137 @@ class AclAuthorizer extends Authorizer with Logging { if (zkClient != null) zkClient.close() } + override def authorizeByResourceType(requestContext: AuthorizableRequestContext, + op: AclOperation, + resourceType: ResourceType): AuthorizationResult = { +SecurityUtils.authorizeByResourceTypeCheckArgs(op, resourceType) + +val principal = new KafkaPrincipal( + requestContext.principal().getPrincipalType, + requestContext.principal().getName) + +if (isSuperUser(principal)) + return AuthorizationResult.ALLOWED + +val principalStr = principal.toString + +val host = requestContext.clientAddress().getHostAddress +val action = new Action(op, new ResourcePattern(resourceType, "NONE", PatternType.UNKNOWN), 0, true, true) + +val denyLiterals = matchingResources( + principalStr, host, op, AclPermissionType.DENY, resourceType, PatternType.LITERAL) + +if (denyAll(denyLiterals)) { + logAuditMessage(requestContext, action, authorized = false) + return AuthorizationResult.DENIED +} + +if (shouldAllowEveryoneIfNoAclIsFound) { + logAuditMessage(requestContext, action, authorized = true) + return AuthorizationResult.ALLOWED +} + +val denyPrefixes = matchingResources( +
[GitHub] [kafka] d8tltanc commented on a change in pull request #9485: KAKFA-10619: Idempotent producer will get authorized once it has a WRITE access to at least one topic
d8tltanc commented on a change in pull request #9485: URL: https://github.com/apache/kafka/pull/9485#discussion_r545471749 ## File path: core/src/main/scala/kafka/security/authorizer/AclAuthorizer.scala ## @@ -304,6 +309,137 @@ class AclAuthorizer extends Authorizer with Logging { if (zkClient != null) zkClient.close() } + override def authorizeByResourceType(requestContext: AuthorizableRequestContext, + op: AclOperation, + resourceType: ResourceType): AuthorizationResult = { +SecurityUtils.authorizeByResourceTypeCheckArgs(op, resourceType) + +val principal = new KafkaPrincipal( + requestContext.principal().getPrincipalType, + requestContext.principal().getName) + +if (isSuperUser(principal)) + return AuthorizationResult.ALLOWED + +val principalStr = principal.toString + +val host = requestContext.clientAddress().getHostAddress +val action = new Action(op, new ResourcePattern(resourceType, "NONE", PatternType.UNKNOWN), 0, true, true) Review comment: ``` public ResourcePattern(ResourceType resourceType, String name, PatternType patternType) { this.resourceType = Objects.requireNonNull(resourceType, "resourceType"); this.name = Objects.requireNonNull(name, "name"); this.patternType = Objects.requireNonNull(patternType, "patternType"); if (resourceType == ResourceType.ANY) { throw new IllegalArgumentException("resourceType must not be ANY"); } if (patternType == PatternType.MATCH || patternType == PatternType.ANY) { throw new IllegalArgumentException("patternType must not be " + patternType); } } ``` I think the ResourcePattern constructor is preventing us passing PatternType.ANY. It's only usable with Filter. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] d8tltanc commented on a change in pull request #9485: KAKFA-10619: Idempotent producer will get authorized once it has a WRITE access to at least one topic
d8tltanc commented on a change in pull request #9485: URL: https://github.com/apache/kafka/pull/9485#discussion_r544713670 ## File path: core/src/test/scala/unit/kafka/security/authorizer/AuthorizerWrapperTest.scala ## @@ -0,0 +1,211 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package kafka.security.authorizer + +import java.net.InetAddress +import java.util.UUID + +import kafka.security.auth.SimpleAclAuthorizer +import kafka.server.KafkaConfig +import kafka.utils.TestUtils +import kafka.zk.ZooKeeperTestHarness +import kafka.zookeeper.ZooKeeperClient +import org.apache.kafka.common.acl.AclOperation._ +import org.apache.kafka.common.acl._ +import org.apache.kafka.common.network.{ClientInformation, ListenerName} +import org.apache.kafka.common.protocol.ApiKeys +import org.apache.kafka.common.requests.{RequestContext, RequestHeader} +import org.apache.kafka.common.resource.PatternType.LITERAL +import org.apache.kafka.common.resource.ResourceType._ +import org.apache.kafka.common.resource.{ResourcePattern, ResourceType} +import org.apache.kafka.common.security.auth.{KafkaPrincipal, SecurityProtocol} +import org.apache.kafka.common.utils.Time +import org.apache.kafka.server.authorizer._ +import org.junit.Assert._ +import org.junit.{After, Before, Test} + +import scala.annotation.nowarn +import scala.collection.mutable.ArrayBuffer +import scala.jdk.CollectionConverters._ + +class AuthorizerWrapperTest extends ZooKeeperTestHarness { + @nowarn("cat=deprecation") + private val wrappedSimpleAuthorizer = new AuthorizerWrapper(new SimpleAclAuthorizer) + @nowarn("cat=deprecation") + private val wrappedSimpleAuthorizerAllowEveryone = new AuthorizerWrapper(new SimpleAclAuthorizer) + private var resource: ResourcePattern = _ + private val superUsers = "User:superuser1; User:superuser2" + private val username = "alice" + private val principal = new KafkaPrincipal(KafkaPrincipal.USER_TYPE, username) + private val requestContext = newRequestContext(principal, InetAddress.getByName("192.168.0.1")) + private var config: KafkaConfig = _ + private var zooKeeperClient: ZooKeeperClient = _ + + private val aclAdded: ArrayBuffer[(Authorizer, Set[AccessControlEntry], ResourcePattern)] = ArrayBuffer() + private val authorizerTestFactory = new AuthorizerTestFactory( +newRequestContext, addAcls, authorizeByResourceType, removeAcls) + + class CustomPrincipal(principalType: String, name: String) extends KafkaPrincipal(principalType, name) { +override def equals(o: scala.Any): Boolean = false + } + + @Before + @nowarn("cat=deprecation") + override def setUp(): Unit = { +super.setUp() + +val props = TestUtils.createBrokerConfig(0, zkConnect) + +props.put(AclAuthorizer.SuperUsersProp, superUsers) +config = KafkaConfig.fromProps(props) +wrappedSimpleAuthorizer.configure(config.originals) + +props.put(SimpleAclAuthorizer.AllowEveryoneIfNoAclIsFoundProp, "true") +config = KafkaConfig.fromProps(props) +wrappedSimpleAuthorizerAllowEveryone.configure(config.originals) + +resource = new ResourcePattern(TOPIC, "foo-" + UUID.randomUUID(), LITERAL) +zooKeeperClient = new ZooKeeperClient(zkConnect, zkSessionTimeout, zkConnectionTimeout, zkMaxInFlightRequests, + Time.SYSTEM, "kafka.test", "AuthorizerWrapperTest") + } + + @After + override def tearDown(): Unit = { +val authorizers = Seq(wrappedSimpleAuthorizer, wrappedSimpleAuthorizerAllowEveryone) +authorizers.foreach(a => { + a.close() +}) +zooKeeperClient.close() +super.tearDown() + } + + @Test + def testAuthorizeByResourceTypeMultipleAddAndRemove(): Unit = { + authorizerTestFactory.testAuthorizeByResourceTypeMultipleAddAndRemove(wrappedSimpleAuthorizer) + } Review comment: commit 092fec70a9547ec07cba999e77be1c0cf79fa275 commit e5e3d18f57ab22df20133f9841905af384d9b641 These two commits are condensing the class methods and members into the BaseAuthorizerTest. In BaseAuthorizerTest, the only abstract method is an authorizer provider. After overriding the provider, those test cases in it are sufficient to run. Now the test code looks much cleaner. If the changes look too
[GitHub] [kafka] d8tltanc commented on a change in pull request #9485: KAKFA-10619: Idempotent producer will get authorized once it has a WRITE access to at least one topic
d8tltanc commented on a change in pull request #9485: URL: https://github.com/apache/kafka/pull/9485#discussion_r544547081 ## File path: jmh-benchmarks/src/main/java/org/apache/kafka/jmh/acl/AclAuthorizerBenchmark.java ## @@ -116,38 +122,88 @@ private void setFieldValue(Object obj, String fieldName, Object value) throws Ex Set entries = aclEntries.computeIfAbsent(resource, k -> new HashSet<>()); for (int aclId = 0; aclId < aclCount; aclId++) { -AccessControlEntry ace = new AccessControlEntry(principal.toString() + aclId, -"*", AclOperation.READ, AclPermissionType.ALLOW); -entries.add(new AclEntry(ace)); +// The principle in the request context we are using Review comment: commit ec80dc4e55758d83835f3ecde381a988d6dd4779 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] d8tltanc commented on a change in pull request #9485: KAKFA-10619: Idempotent producer will get authorized once it has a WRITE access to at least one topic
d8tltanc commented on a change in pull request #9485: URL: https://github.com/apache/kafka/pull/9485#discussion_r544546946 ## File path: core/src/test/scala/unit/kafka/security/authorizer/AuthorizerTestFactory.scala ## @@ -0,0 +1,321 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package kafka.security.authorizer + +import java.net.InetAddress +import java.util.UUID + +import kafka.security.authorizer.AclEntry.{WildcardHost, WildcardPrincipalString} +import org.apache.kafka.common.acl.AclOperation.{ALL, READ, WRITE} +import org.apache.kafka.common.acl.AclPermissionType.{ALLOW, DENY} +import org.apache.kafka.common.acl.{AccessControlEntry, AclOperation} +import org.apache.kafka.common.protocol.ApiKeys +import org.apache.kafka.common.requests.RequestContext +import org.apache.kafka.common.resource.PatternType.{LITERAL, PREFIXED} +import org.apache.kafka.common.resource.ResourcePattern.WILDCARD_RESOURCE +import org.apache.kafka.common.resource.ResourceType.{CLUSTER, GROUP, TOPIC, TRANSACTIONAL_ID} +import org.apache.kafka.common.resource.{ResourcePattern, ResourceType} +import org.apache.kafka.common.security.auth.KafkaPrincipal +import org.apache.kafka.server.authorizer.Authorizer +import org.junit.Assert.{assertFalse, assertTrue} + +class AuthorizerTestFactory(val newRequestContext3: (KafkaPrincipal, InetAddress, ApiKeys) => RequestContext, +val addAcls: (Authorizer, Set[AccessControlEntry], ResourcePattern) => Unit, +val authorizeByResourceType: (Authorizer, RequestContext, AclOperation, ResourceType) => Boolean, +val removeAcls: (Authorizer, Set[AccessControlEntry], ResourcePattern) => Unit) { + def newRequestContext(kafkaPrincipal: KafkaPrincipal, inetAddress: InetAddress): RequestContext = +newRequestContext3(kafkaPrincipal, inetAddress, ApiKeys.PRODUCE) + + def testAuthorizeByResourceTypeMultipleAddAndRemove(authorizer: Authorizer): Unit = { +val user1 = new KafkaPrincipal(KafkaPrincipal.USER_TYPE, "user1") +val host1 = InetAddress.getByName("192.168.1.1") +val resource1 = new ResourcePattern(TOPIC, "sb1" + UUID.randomUUID(), LITERAL) +val denyRead = new AccessControlEntry(user1.toString, host1.getHostAddress, READ, DENY) +val allowRead = new AccessControlEntry(user1.toString, host1.getHostAddress, READ, ALLOW) +val u1h1Context = newRequestContext(user1, host1) + +for (_ <- 1 to 10) { + assertFalse("User1 from host1 should not have READ access to any topic when no ACL exists", +authorizeByResourceType(authorizer, u1h1Context, READ, ResourceType.TOPIC)) + + addAcls(authorizer, Set(allowRead), resource1) + assertTrue("User1 from host1 now should have READ access to at least one topic", +authorizeByResourceType(authorizer, u1h1Context, READ, ResourceType.TOPIC)) + + for (_ <- 1 to 10) { +addAcls(authorizer, Set(denyRead), resource1) +assertFalse("User1 from host1 now should not have READ access to any topic", + authorizeByResourceType(authorizer, u1h1Context, READ, ResourceType.TOPIC)) + +removeAcls(authorizer, Set(denyRead), resource1) +addAcls(authorizer, Set(allowRead), resource1) +assertTrue("User1 from host1 now should have READ access to at least one topic", + authorizeByResourceType(authorizer, u1h1Context, READ, ResourceType.TOPIC)) + } + + removeAcls(authorizer, Set(allowRead), resource1) + assertFalse("User1 from host1 now should not have READ access to any topic", +authorizeByResourceType(authorizer, u1h1Context, READ, ResourceType.TOPIC)) +} + } + + def testAuthorizeByResourceTypeIsolationUnrelatedDenyWontDominateAllow(authorizer: Authorizer): Unit = { +val user1 = new KafkaPrincipal(KafkaPrincipal.USER_TYPE, "user1") +val user2 = new KafkaPrincipal(KafkaPrincipal.USER_TYPE, "user2") +val host1 = InetAddress.getByName("192.168.1.1") +val host2 = InetAddress.getByName("192.168.1.2") +val resource1 = new ResourcePattern(TOPIC, "sb1" + UUID.randomUUID(), LITERAL) +val resource2 = new ResourcePattern(TOPIC, "sb2" +
[GitHub] [kafka] d8tltanc commented on a change in pull request #9485: KAKFA-10619: Idempotent producer will get authorized once it has a WRITE access to at least one topic
d8tltanc commented on a change in pull request #9485: URL: https://github.com/apache/kafka/pull/9485#discussion_r544545938 ## File path: clients/src/main/java/org/apache/kafka/server/authorizer/Authorizer.java ## @@ -139,4 +152,134 @@ * @return Iterator for ACL bindings, which may be populated lazily. */ Iterable acls(AclBindingFilter filter); + +/** + * Check if the caller is authorized to perform the given ACL operation on at least one + * resource of the given type. + * + * It is important to override this interface default in implementations because + * 1. The interface default iterates all AclBindings multiple times, without any indexing, + *which is a CPU intense work. + * 2. The interface default rebuild several sets of strings, which is a memory intense work. + * 3. The interface default cannot perform the audit logging properly + * + * @param requestContext Request context including request resourceType, security protocol, and listener name + * @param op The ACL operation to check + * @param resourceType The resource type to check + * @return Return {@link AuthorizationResult#ALLOWED} if the caller is authorized to perform the + * given ACL operation on at least one resource of the given type. + * Return {@link AuthorizationResult#DENIED} otherwise. + */ +default AuthorizationResult authorizeByResourceType(AuthorizableRequestContext requestContext, AclOperation op, ResourceType resourceType) { +SecurityUtils.authorizeByResourceTypeCheckArgs(op, resourceType); + +if (authorize(requestContext, Collections.singletonList(new Action( +AclOperation.READ, Review comment: commit ec80dc4e55758d83835f3ecde381a988d6dd4779 ## File path: clients/src/main/java/org/apache/kafka/server/authorizer/Authorizer.java ## @@ -139,4 +152,134 @@ * @return Iterator for ACL bindings, which may be populated lazily. */ Iterable acls(AclBindingFilter filter); + +/** + * Check if the caller is authorized to perform the given ACL operation on at least one + * resource of the given type. + * + * It is important to override this interface default in implementations because + * 1. The interface default iterates all AclBindings multiple times, without any indexing, + *which is a CPU intense work. + * 2. The interface default rebuild several sets of strings, which is a memory intense work. + * 3. The interface default cannot perform the audit logging properly + * + * @param requestContext Request context including request resourceType, security protocol, and listener name + * @param op The ACL operation to check + * @param resourceType The resource type to check + * @return Return {@link AuthorizationResult#ALLOWED} if the caller is authorized to perform the + * given ACL operation on at least one resource of the given type. + * Return {@link AuthorizationResult#DENIED} otherwise. + */ +default AuthorizationResult authorizeByResourceType(AuthorizableRequestContext requestContext, AclOperation op, ResourceType resourceType) { +SecurityUtils.authorizeByResourceTypeCheckArgs(op, resourceType); + +if (authorize(requestContext, Collections.singletonList(new Action( +AclOperation.READ, +new ResourcePattern(resourceType, "hardcode", PatternType.LITERAL), +0, false, false))) Review comment: commit ec80dc4e55758d83835f3ecde381a988d6dd4779 ## File path: core/src/main/scala/kafka/security/authorizer/AclAuthorizer.scala ## @@ -304,6 +310,137 @@ class AclAuthorizer extends Authorizer with Logging { if (zkClient != null) zkClient.close() } + override def authorizeByResourceType(requestContext: AuthorizableRequestContext, + op: AclOperation, + resourceType: ResourceType): AuthorizationResult = { +SecurityUtils.authorizeByResourceTypeCheckArgs(op, resourceType) + +val principal = new KafkaPrincipal( + requestContext.principal().getPrincipalType, + requestContext.principal().getName) + +if (isSuperUser(principal)) + return AuthorizationResult.ALLOWED + +val principalStr = principal.toString + +val host = requestContext.clientAddress().getHostAddress +val action = new Action(op, new ResourcePattern(resourceType, "NONE", PatternType.UNKNOWN), 0, true, true) + +val denyLiterals = matchingResources( + principalStr, host, op, AclPermissionType.DENY, resourceType, PatternType.LITERAL) + +if (denyAll(denyLiterals)) { + logAuditMessage(requestContext, action, false) Review comment: commit
[GitHub] [kafka] d8tltanc commented on a change in pull request #9485: KAKFA-10619: Idempotent producer will get authorized once it has a WRITE access to at least one topic
d8tltanc commented on a change in pull request #9485: URL: https://github.com/apache/kafka/pull/9485#discussion_r543114091 ## File path: core/src/test/scala/unit/kafka/security/authorizer/AclAuthorizerTest.scala ## @@ -1040,19 +1116,24 @@ class AclAuthorizerTest extends ZooKeeperTestHarness { securityProtocol, ClientInformation.EMPTY, false) } - private def authorize(authorizer: AclAuthorizer, requestContext: RequestContext, operation: AclOperation, resource: ResourcePattern): Boolean = { + private def authorize(authorizer: Authorizer, requestContext: RequestContext, operation: AclOperation, resource: ResourcePattern): Boolean = { val action = new Action(operation, resource, 1, true, true) authorizer.authorize(requestContext, List(action).asJava).asScala.head == AuthorizationResult.ALLOWED } - private def addAcls(authorizer: AclAuthorizer, aces: Set[AccessControlEntry], resourcePattern: ResourcePattern): Unit = { + private def authorizeByResourceType(authorizer: Authorizer, requestContext: RequestContext, operation: AclOperation, resourceType: ResourceType) : Boolean = { +authorizer.authorizeByResourceType(requestContext, operation, resourceType) == AuthorizationResult.ALLOWED + } + + private def addAcls(authorizer: Authorizer, aces: Set[AccessControlEntry], resourcePattern: ResourcePattern): Unit = { Review comment: I replied here. Maybe I shouldn't have resolved it. https://github.com/apache/kafka/pull/9485#discussion_r540706117 Since AuthorizerInterfaceDefaultTest, AclAuthorizerTest, and AuthorizerWrapperTest are sharing some test cases, we need to make this method signature abstract a bit, in order to pass the method reference to AuthorizerTestFactory. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] d8tltanc commented on a change in pull request #9485: KAKFA-10619: Idempotent producer will get authorized once it has a WRITE access to at least one topic
d8tltanc commented on a change in pull request #9485: URL: https://github.com/apache/kafka/pull/9485#discussion_r543114091 ## File path: core/src/test/scala/unit/kafka/security/authorizer/AclAuthorizerTest.scala ## @@ -1040,19 +1116,24 @@ class AclAuthorizerTest extends ZooKeeperTestHarness { securityProtocol, ClientInformation.EMPTY, false) } - private def authorize(authorizer: AclAuthorizer, requestContext: RequestContext, operation: AclOperation, resource: ResourcePattern): Boolean = { + private def authorize(authorizer: Authorizer, requestContext: RequestContext, operation: AclOperation, resource: ResourcePattern): Boolean = { val action = new Action(operation, resource, 1, true, true) authorizer.authorize(requestContext, List(action).asJava).asScala.head == AuthorizationResult.ALLOWED } - private def addAcls(authorizer: AclAuthorizer, aces: Set[AccessControlEntry], resourcePattern: ResourcePattern): Unit = { + private def authorizeByResourceType(authorizer: Authorizer, requestContext: RequestContext, operation: AclOperation, resourceType: ResourceType) : Boolean = { +authorizer.authorizeByResourceType(requestContext, operation, resourceType) == AuthorizationResult.ALLOWED + } + + private def addAcls(authorizer: Authorizer, aces: Set[AccessControlEntry], resourcePattern: ResourcePattern): Unit = { Review comment: I replied here. Maybe I shouldn't have resolved it. https://github.com/apache/kafka/pull/9485#discussion_r540706117 Since AuthorizerInterfaceDefaultTest, AclAuthorizerTest, and AuthorizerWrapperTest are sharing some test cases, we need to make this method signature abstract a bit, in order to make it usable by AuthorizerTestFactory. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] d8tltanc commented on a change in pull request #9485: KAKFA-10619: Idempotent producer will get authorized once it has a WRITE access to at least one topic
d8tltanc commented on a change in pull request #9485: URL: https://github.com/apache/kafka/pull/9485#discussion_r543817949 ## File path: core/src/main/scala/kafka/security/authorizer/AclAuthorizer.scala ## @@ -130,6 +130,10 @@ class AclAuthorizer extends Authorizer with Logging { @volatile private var aclCache = new scala.collection.immutable.TreeMap[ResourcePattern, VersionedAcls]()(new ResourceOrdering) + + private val resourceCache = new scala.collection.mutable.HashMap[ResourceIndex, +scala.collection.mutable.HashSet[String]]() Review comment: commit 62c44ade550a90671ff41bfb847e2bc28adc7baa This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] d8tltanc commented on a change in pull request #9485: KAKFA-10619: Idempotent producer will get authorized once it has a WRITE access to at least one topic
d8tltanc commented on a change in pull request #9485: URL: https://github.com/apache/kafka/pull/9485#discussion_r543814681 ## File path: clients/src/main/java/org/apache/kafka/server/authorizer/Authorizer.java ## @@ -139,4 +151,126 @@ * @return Iterator for ACL bindings, which may be populated lazily. */ Iterable acls(AclBindingFilter filter); + +/** + * Check if the caller is authorized to perform the given ACL operation on at least one + * resource of the given type. + * + * 1. Filter out all the resource pattern corresponding to the requestContext, AclOperation, + *and ResourceType + * 2. If wildcard deny exists, return deny directly + * 3. For any literal allowed resource, if there's no dominant literal denied resource, and + *no dominant prefixed denied resource, return allow + * 4. For any prefixed allowed resource, if there's no dominant denied resource, return allow + * 5. For any other cases, return deny + * + * It is important to override this interface default in implementations because + * 1. The interface default iterates all AclBindings multiple times, without any indexing, + *which is a CPU intense work. + * 2. The interface default rebuild several sets of strings, which is a memory intense work. + * + * @param requestContext Request context including request resourceType, security protocol, and listener name + * @param op The ACL operation to check + * @param resourceType The resource type to check + * @return Return {@link AuthorizationResult#ALLOWED} if the caller is authorized to perform the + * given ACL operation on at least one resource of the given type. + * Return {@link AuthorizationResult#DENIED} otherwise. + */ +default AuthorizationResult authorizeByResourceType(AuthorizableRequestContext requestContext, AclOperation op, ResourceType resourceType) { +SecurityUtils.authorizeByResourceTypeCheckArgs(op, resourceType); Review comment: Good catch. This is super important. commit dae1a78 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] d8tltanc commented on a change in pull request #9485: KAKFA-10619: Idempotent producer will get authorized once it has a WRITE access to at least one topic
d8tltanc commented on a change in pull request #9485: URL: https://github.com/apache/kafka/pull/9485#discussion_r543814563 ## File path: clients/src/main/java/org/apache/kafka/common/acl/ResourceIndex.java ## @@ -0,0 +1,61 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.common.acl; Review comment: Make ResourceTypeKey an inner class of AclAuthorizer commit 7fe92c6436432760adf9465c3f0bcf3c91104b10 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] d8tltanc commented on a change in pull request #9485: KAKFA-10619: Idempotent producer will get authorized once it has a WRITE access to at least one topic
d8tltanc commented on a change in pull request #9485: URL: https://github.com/apache/kafka/pull/9485#discussion_r543814295 ## File path: clients/src/main/java/org/apache/kafka/common/acl/ResourceIndex.java ## @@ -0,0 +1,61 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.common.acl; + +import org.apache.kafka.common.resource.PatternType; +import org.apache.kafka.common.resource.ResourceType; + +import java.util.Objects; + +public class ResourceIndex { Review comment: ResourceTypeKey sounds good: commit 7fe92c6436432760adf9465c3f0bcf3c91104b10 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] d8tltanc commented on a change in pull request #9485: KAKFA-10619: Idempotent producer will get authorized once it has a WRITE access to at least one topic
d8tltanc commented on a change in pull request #9485: URL: https://github.com/apache/kafka/pull/9485#discussion_r543813486 ## File path: core/src/test/scala/unit/kafka/security/authorizer/AclAuthorizerTest.scala ## @@ -100,8 +106,15 @@ class AclAuthorizerTest extends ZooKeeperTestHarness { @After override def tearDown(): Unit = { -aclAuthorizer.close() -aclAuthorizer2.close() +val authorizers = Seq(aclAuthorizer, aclAuthorizer2) +authorizers.foreach(a => { + a.acls(AclBindingFilter.ANY).forEach(bd => { +removeAcls(aclAuthorizer, Set(bd.entry), bd.pattern()) Review comment: Removed. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] d8tltanc commented on a change in pull request #9485: KAKFA-10619: Idempotent producer will get authorized once it has a WRITE access to at least one topic
d8tltanc commented on a change in pull request #9485: URL: https://github.com/apache/kafka/pull/9485#discussion_r543805858 ## File path: core/src/main/scala/kafka/security/authorizer/AclAuthorizer.scala ## @@ -130,6 +130,10 @@ class AclAuthorizer extends Authorizer with Logging { @volatile private var aclCache = new scala.collection.immutable.TreeMap[ResourcePattern, VersionedAcls]()(new ResourceOrdering) + + private val resourceCache = new scala.collection.mutable.HashMap[ResourceIndex, +scala.collection.mutable.HashSet[String]]() Review comment: Use Immutable collections: Benchmark (aclCount) (denyPercentage) (resourceCount) Mode Cnt Score Error Units AclAuthorizerBenchmark.testAclsIterator 50 100 20 avgt5 4132.824 ± 2967.122 ms/op AclAuthorizerBenchmark.testAuthorizeByResourceType 50 100 20 avgt546.733 ±5.397 ms/op AclAuthorizerBenchmark.testAuthorizer 50 100 20 avgt5 6.844 ±0.915 ms/op AclAuthorizerBenchmark.testUpdateCache 50 100 20 avgt5 7219.696 ± 4018.189 ms/op JMH benchmarks done Use Mutable collections: AclAuthorizerBenchmark.testUpdateCache 50 100 20 avgt5 4927.832 ± 2570.786 ms/op When aclCount = 50, denyPercentage = 100, resourceCount = 20, the time cost is 2.3 seconds more with immutable collections. But since adding 50 * 2 ACL bindings only takes ~7 seconds, I think the performance should be acceptable. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] d8tltanc commented on a change in pull request #9485: KAKFA-10619: Idempotent producer will get authorized once it has a WRITE access to at least one topic
d8tltanc commented on a change in pull request #9485: URL: https://github.com/apache/kafka/pull/9485#discussion_r543805858 ## File path: core/src/main/scala/kafka/security/authorizer/AclAuthorizer.scala ## @@ -130,6 +130,10 @@ class AclAuthorizer extends Authorizer with Logging { @volatile private var aclCache = new scala.collection.immutable.TreeMap[ResourcePattern, VersionedAcls]()(new ResourceOrdering) + + private val resourceCache = new scala.collection.mutable.HashMap[ResourceIndex, +scala.collection.mutable.HashSet[String]]() Review comment: Use Immutable collections: Benchmark (aclCount) (denyPercentage) (resourceCount) Mode Cnt Score Error Units AclAuthorizerBenchmark.testAclsIterator 50 100 20 avgt5 4132.824 ± 2967.122 ms/op AclAuthorizerBenchmark.testAuthorizeByResourceType 50 100 20 avgt546.733 ±5.397 ms/op AclAuthorizerBenchmark.testAuthorizer 50 100 20 avgt5 6.844 ±0.915 ms/op AclAuthorizerBenchmark.testUpdateCache 50 100 20 avgt5 7219.696 ± 4018.189 ms/op JMH benchmarks done Use Mutable collections: AclAuthorizerBenchmark.testUpdateCache 50 100 20 avgt5 4927.832 ± 2570.786 ms/op When aclCount = 50, denyPercentage = 100, resourceCount = 20, the time cost is 2.3 seconds more with immutable collections. But since adding 100 * 2 ACL bindings only takes ~7 seconds, I think the performance should be acceptable. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] d8tltanc commented on a change in pull request #9485: KAKFA-10619: Idempotent producer will get authorized once it has a WRITE access to at least one topic
d8tltanc commented on a change in pull request #9485: URL: https://github.com/apache/kafka/pull/9485#discussion_r543696178 ## File path: core/src/main/scala/kafka/security/authorizer/AclAuthorizer.scala ## @@ -130,6 +130,10 @@ class AclAuthorizer extends Authorizer with Logging { @volatile private var aclCache = new scala.collection.immutable.TreeMap[ResourcePattern, VersionedAcls]()(new ResourceOrdering) + + private val resourceCache = new scala.collection.mutable.HashMap[ResourceIndex, +scala.collection.mutable.HashSet[String]]() Review comment: Use Immutable collections: Benchmark (aclCount) (denyPercentage) (resourceCount) Mode Cnt Score Error Units AclAuthorizerBenchmark.testUpdateCache 50 100 20 avgt5 10639.073 ± 3470.889 ms/op Use Mutable collections: AclAuthorizerBenchmark.testUpdateCache 50 100 20 avgt5 4927.832 ± 2570.786 ms/op The time cost doubled with immutable collections. But since adding 100 * 2 ACL bindings only takes 10 seconds, adding 2 ACL binding takes only 100 ms on average, which should be acceptable. ## File path: core/src/main/scala/kafka/security/authorizer/AclAuthorizer.scala ## @@ -130,6 +130,10 @@ class AclAuthorizer extends Authorizer with Logging { @volatile private var aclCache = new scala.collection.immutable.TreeMap[ResourcePattern, VersionedAcls]()(new ResourceOrdering) + + private val resourceCache = new scala.collection.mutable.HashMap[ResourceIndex, +scala.collection.mutable.HashSet[String]]() Review comment: Use Immutable collections: Benchmark (aclCount) (denyPercentage) (resourceCount) Mode Cnt Score Error Units AclAuthorizerBenchmark.testAclsIterator 50 100 20 avgt5 4132.824 ± 2967.122 ms/op AclAuthorizerBenchmark.testAuthorizeByResourceType 50 100 20 avgt546.733 ±5.397 ms/op AclAuthorizerBenchmark.testAuthorizer 50 100 20 avgt5 6.844 ±0.915 ms/op AclAuthorizerBenchmark.testUpdateCache 50 100 20 avgt5 7219.696 ± 4018.189 ms/op JMH benchmarks done Use Mutable collections: AclAuthorizerBenchmark.testUpdateCache 50 100 20 avgt5 4927.832 ± 2570.786 ms/op The time cost is 2.3 seconds more. with immutable collections. But since adding 100 * 2 ACL bindings only takes ~7 seconds, I think the performance should be acceptable. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] d8tltanc commented on a change in pull request #9485: KAKFA-10619: Idempotent producer will get authorized once it has a WRITE access to at least one topic
d8tltanc commented on a change in pull request #9485: URL: https://github.com/apache/kafka/pull/9485#discussion_r543745352 ## File path: core/src/test/scala/unit/kafka/security/authorizer/AuthorizerInterfaceDefaultTest.scala ## @@ -0,0 +1,164 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package kafka.security.authorizer + +import java.net.InetAddress + +import kafka.server.KafkaConfig +import kafka.utils.TestUtils +import kafka.zk.ZooKeeperTestHarness +import kafka.zookeeper.ZooKeeperClient +import org.apache.kafka.common.acl._ +import org.apache.kafka.common.network.{ClientInformation, ListenerName} +import org.apache.kafka.common.protocol.ApiKeys +import org.apache.kafka.common.requests.{RequestContext, RequestHeader} +import org.apache.kafka.common.resource.{ResourcePattern, ResourceType} +import org.apache.kafka.common.security.auth.{KafkaPrincipal, SecurityProtocol} +import org.apache.kafka.common.utils.Time +import org.apache.kafka.server.authorizer._ +import org.junit.{After, Before, Test} + +import scala.collection.mutable.ArrayBuffer +import scala.jdk.CollectionConverters._ + +class AuthorizerInterfaceDefaultTest extends ZooKeeperTestHarness { + + private val interfaceDefaultAuthorizer = new DelegateAuthorizer + private val superUsers = "User:superuser1; User:superuser2" Review comment: Test added for AuthorizerInterfaceDefaultTest, AclAuthorizerTest, AuthorizerWrapperTest. commit dae1a788b70ebc03eab265b1027a4b43ad8e773b This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] d8tltanc commented on a change in pull request #9485: KAKFA-10619: Idempotent producer will get authorized once it has a WRITE access to at least one topic
d8tltanc commented on a change in pull request #9485: URL: https://github.com/apache/kafka/pull/9485#discussion_r543744182 ## File path: core/src/main/scala/kafka/security/authorizer/AuthorizerWrapper.scala ## @@ -175,4 +185,39 @@ class AuthorizerWrapper(private[kafka] val baseAuthorizer: kafka.security.auth.A override def close(): Unit = { baseAuthorizer.close() } + + override def authorizeByResourceType(requestContext: AuthorizableRequestContext, + op: AclOperation, + resourceType: ResourceType): AuthorizationResult = { +SecurityUtils.authorizeByResourceTypeCheckArgs(op, resourceType) + +if (denyAllResource(requestContext, op, resourceType)) { + AuthorizationResult.DENIED +} else if (shouldAllowEveryoneIfNoAclIsFound) { + AuthorizationResult.ALLOWED +} else { + super.authorizeByResourceType(requestContext, op, resourceType) +} Review comment: Good catch. This is super important. commit dae1a788b70ebc03eab265b1027a4b43ad8e773b This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] d8tltanc commented on a change in pull request #9485: KAKFA-10619: Idempotent producer will get authorized once it has a WRITE access to at least one topic
d8tltanc commented on a change in pull request #9485: URL: https://github.com/apache/kafka/pull/9485#discussion_r543744087 ## File path: core/src/main/scala/kafka/security/authorizer/AclAuthorizer.scala ## @@ -304,6 +308,131 @@ class AclAuthorizer extends Authorizer with Logging { if (zkClient != null) zkClient.close() } + override def authorizeByResourceType(requestContext: AuthorizableRequestContext, + op: AclOperation, + resourceType: ResourceType): AuthorizationResult = { +SecurityUtils.authorizeByResourceTypeCheckArgs(op, resourceType) + +val principal = new KafkaPrincipal( + requestContext.principal().getPrincipalType, + requestContext.principal().getName).toString Review comment: Good catch. This is super important. commit dae1a788b70ebc03eab265b1027a4b43ad8e773b This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] d8tltanc commented on a change in pull request #9485: KAKFA-10619: Idempotent producer will get authorized once it has a WRITE access to at least one topic
d8tltanc commented on a change in pull request #9485: URL: https://github.com/apache/kafka/pull/9485#discussion_r543696178 ## File path: core/src/main/scala/kafka/security/authorizer/AclAuthorizer.scala ## @@ -130,6 +130,10 @@ class AclAuthorizer extends Authorizer with Logging { @volatile private var aclCache = new scala.collection.immutable.TreeMap[ResourcePattern, VersionedAcls]()(new ResourceOrdering) + + private val resourceCache = new scala.collection.mutable.HashMap[ResourceIndex, +scala.collection.mutable.HashSet[String]]() Review comment: Use Immutable collections: Benchmark (aclCount) (denyPercentage) (resourceCount) Mode Cnt Score Error Units AclAuthorizerBenchmark.testUpdateCache 50 100 20 avgt5 10639.073 ± 3470.889 ms/op Use Mutable collections: AclAuthorizerBenchmark.testUpdateCache 50 100 20 avgt5 4927.832 ± 2570.786 ms/op The time cost doubled with immutable collections. But since adding 100 * 2 ACL bindings only takes 10 seconds, adding 2 ACL binding takes only 100 ms, which should be acceptable. ## File path: core/src/main/scala/kafka/security/authorizer/AclAuthorizer.scala ## @@ -130,6 +130,10 @@ class AclAuthorizer extends Authorizer with Logging { @volatile private var aclCache = new scala.collection.immutable.TreeMap[ResourcePattern, VersionedAcls]()(new ResourceOrdering) + + private val resourceCache = new scala.collection.mutable.HashMap[ResourceIndex, +scala.collection.mutable.HashSet[String]]() Review comment: Use Immutable collections: Benchmark (aclCount) (denyPercentage) (resourceCount) Mode Cnt Score Error Units AclAuthorizerBenchmark.testUpdateCache 50 100 20 avgt5 10639.073 ± 3470.889 ms/op Use Mutable collections: AclAuthorizerBenchmark.testUpdateCache 50 100 20 avgt5 4927.832 ± 2570.786 ms/op The time cost doubled with immutable collections. But since adding 100 * 2 ACL bindings only takes 10 seconds, adding 2 ACL binding takes only 100 ms on average, which should be acceptable. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] d8tltanc commented on a change in pull request #9485: KAKFA-10619: Idempotent producer will get authorized once it has a WRITE access to at least one topic
d8tltanc commented on a change in pull request #9485: URL: https://github.com/apache/kafka/pull/9485#discussion_r543174139 ## File path: core/src/main/scala/kafka/security/authorizer/AclAuthorizer.scala ## @@ -130,6 +130,10 @@ class AclAuthorizer extends Authorizer with Logging { @volatile private var aclCache = new scala.collection.immutable.TreeMap[ResourcePattern, VersionedAcls]()(new ResourceOrdering) + + private val resourceCache = new scala.collection.mutable.HashMap[ResourceIndex, +scala.collection.mutable.HashSet[String]]() Review comment: I tested a bit, using 1 bg thread adding and removing elements to a mutable.HashSet while the main thread constantly iterating the HashSet using "foreach". The "foreach" call doesn't throw any exception. But I'm a bit unsure what would happen if the iteration hits a bucket where some elements are being added to or deleted from. Let me test what's the overhead using the immutable map. I'd prefer this approach as we're expecting much more READ than WRITE to the hashset. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] d8tltanc commented on a change in pull request #9485: KAKFA-10619: Idempotent producer will get authorized once it has a WRITE access to at least one topic
d8tltanc commented on a change in pull request #9485: URL: https://github.com/apache/kafka/pull/9485#discussion_r543174139 ## File path: core/src/main/scala/kafka/security/authorizer/AclAuthorizer.scala ## @@ -130,6 +130,10 @@ class AclAuthorizer extends Authorizer with Logging { @volatile private var aclCache = new scala.collection.immutable.TreeMap[ResourcePattern, VersionedAcls]()(new ResourceOrdering) + + private val resourceCache = new scala.collection.mutable.HashMap[ResourceIndex, +scala.collection.mutable.HashSet[String]]() Review comment: I tested a bit, using 1 bg thread adding and removing elements to a mutable.HashSet while the main thread constantly iterating the HashSet using "foreach". The "foreach" call doesn't throw any exception. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] d8tltanc commented on a change in pull request #9485: KAKFA-10619: Idempotent producer will get authorized once it has a WRITE access to at least one topic
d8tltanc commented on a change in pull request #9485: URL: https://github.com/apache/kafka/pull/9485#discussion_r543168812 ## File path: core/src/main/scala/kafka/security/authorizer/AclAuthorizer.scala ## @@ -130,6 +130,10 @@ class AclAuthorizer extends Authorizer with Logging { @volatile private var aclCache = new scala.collection.immutable.TreeMap[ResourcePattern, VersionedAcls]()(new ResourceOrdering) + + private val resourceCache = new scala.collection.mutable.HashMap[ResourceIndex, +scala.collection.mutable.HashSet[String]]() Review comment: I tested a bit, using 1 bg thread adding elements in order and removing those added in order while the main thread query the HashSet using "foreach". The elements in the query result are not continous, which means the concurrent R/W will destroy the "foreach" completely. Let me test the performance of the immutable map a bit. I prefer this approach since it won't lock the read operation, which is much more frequent. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] d8tltanc commented on a change in pull request #9485: KAKFA-10619: Idempotent producer will get authorized once it has a WRITE access to at least one topic
d8tltanc commented on a change in pull request #9485: URL: https://github.com/apache/kafka/pull/9485#discussion_r543168812 ## File path: core/src/main/scala/kafka/security/authorizer/AclAuthorizer.scala ## @@ -130,6 +130,10 @@ class AclAuthorizer extends Authorizer with Logging { @volatile private var aclCache = new scala.collection.immutable.TreeMap[ResourcePattern, VersionedAcls]()(new ResourceOrdering) + + private val resourceCache = new scala.collection.mutable.HashMap[ResourceIndex, +scala.collection.mutable.HashSet[String]]() Review comment: I tested a bit, using 1 bg thread adding elements in order and removing those added in order while the main thread query the HashSet using "foreach". The elements in the query result are not in order, which means the concurrent R/W will destroy the "foreach" completely. Let me test the performance of the immutable map a bit. I prefer this approach since it won't lock the read operation, which is much more frequent. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] d8tltanc commented on a change in pull request #9485: KAKFA-10619: Idempotent producer will get authorized once it has a WRITE access to at least one topic
d8tltanc commented on a change in pull request #9485: URL: https://github.com/apache/kafka/pull/9485#discussion_r543134773 ## File path: core/src/main/scala/kafka/security/authorizer/AclAuthorizer.scala ## @@ -130,6 +130,10 @@ class AclAuthorizer extends Authorizer with Logging { @volatile private var aclCache = new scala.collection.immutable.TreeMap[ResourcePattern, VersionedAcls]()(new ResourceOrdering) + + private val resourceCache = new scala.collection.mutable.HashMap[ResourceIndex, +scala.collection.mutable.HashSet[String]]() Review comment: Would scala "foreach" throw any exception when READ operation races with WRITE in HashMap / HashSet? If not, I think we can tolerate some READ inconsistency as ZK is also broadcasting the ACL changes asynchronously to brokers. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] d8tltanc commented on a change in pull request #9485: KAKFA-10619: Idempotent producer will get authorized once it has a WRITE access to at least one topic
d8tltanc commented on a change in pull request #9485: URL: https://github.com/apache/kafka/pull/9485#discussion_r543120808 ## File path: core/src/test/scala/unit/kafka/security/authorizer/DelegateAuthorizer.scala ## @@ -0,0 +1,59 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package kafka.security.authorizer + +import java.{lang, util} +import java.util.concurrent.CompletionStage + +import org.apache.kafka.common.Endpoint +import org.apache.kafka.common.acl.{AclBinding, AclBindingFilter} +import org.apache.kafka.server.authorizer.{AclCreateResult, AclDeleteResult, Action, AuthorizableRequestContext, AuthorizationResult, Authorizer, AuthorizerServerInfo} + +/** + * For testing the interface default + */ +class DelegateAuthorizer extends Authorizer { Review comment: Make sense. commit e31f157eaac1213445dd284fd2209a29f4fa18fd This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] d8tltanc commented on a change in pull request #9485: KAKFA-10619: Idempotent producer will get authorized once it has a WRITE access to at least one topic
d8tltanc commented on a change in pull request #9485: URL: https://github.com/apache/kafka/pull/9485#discussion_r543118616 ## File path: core/src/test/scala/unit/kafka/security/authorizer/AclAuthorizerTest.scala ## @@ -1040,19 +1116,24 @@ class AclAuthorizerTest extends ZooKeeperTestHarness { securityProtocol, ClientInformation.EMPTY, false) } - private def authorize(authorizer: AclAuthorizer, requestContext: RequestContext, operation: AclOperation, resource: ResourcePattern): Boolean = { + private def authorize(authorizer: Authorizer, requestContext: RequestContext, operation: AclOperation, resource: ResourcePattern): Boolean = { val action = new Action(operation, resource, 1, true, true) authorizer.authorize(requestContext, List(action).asJava).asScala.head == AuthorizationResult.ALLOWED } - private def addAcls(authorizer: AclAuthorizer, aces: Set[AccessControlEntry], resourcePattern: ResourcePattern): Unit = { + private def authorizeByResourceType(authorizer: Authorizer, requestContext: RequestContext, operation: AclOperation, resourceType: ResourceType) : Boolean = { +authorizer.authorizeByResourceType(requestContext, operation, resourceType) == AuthorizationResult.ALLOWED + } + + private def addAcls(authorizer: Authorizer, aces: Set[AccessControlEntry], resourcePattern: ResourcePattern): Unit = { val bindings = aces.map { ace => new AclBinding(resourcePattern, ace) } authorizer.createAcls(requestContext, bindings.toList.asJava).asScala .map(_.toCompletableFuture.get) .foreach { result => result.exception.ifPresent { e => throw e } } +aclAdded += Tuple3(authorizer, aces, resourcePattern) Review comment: Removed as we are not removing ACLs in teadDown() anymore. commit 825a8ba This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] d8tltanc commented on a change in pull request #9485: KAKFA-10619: Idempotent producer will get authorized once it has a WRITE access to at least one topic
d8tltanc commented on a change in pull request #9485: URL: https://github.com/apache/kafka/pull/9485#discussion_r543117875 ## File path: core/src/test/scala/unit/kafka/security/authorizer/AclAuthorizerTest.scala ## @@ -74,6 +75,10 @@ class AclAuthorizerTest extends ZooKeeperTestHarness { private var config: KafkaConfig = _ private var zooKeeperClient: ZooKeeperClient = _ + private val aclAdded: ArrayBuffer[(Authorizer, Set[AccessControlEntry], ResourcePattern)] = ArrayBuffer() Review comment: Removed as we are not removing ACLs in teadDown() anymore. commit 825a8ba77ad1766f998a71a9a15f21e73daad84a ## File path: core/src/test/scala/unit/kafka/security/authorizer/AclAuthorizerTest.scala ## @@ -1040,19 +1116,24 @@ class AclAuthorizerTest extends ZooKeeperTestHarness { securityProtocol, ClientInformation.EMPTY, false) } - private def authorize(authorizer: AclAuthorizer, requestContext: RequestContext, operation: AclOperation, resource: ResourcePattern): Boolean = { + private def authorize(authorizer: Authorizer, requestContext: RequestContext, operation: AclOperation, resource: ResourcePattern): Boolean = { Review comment: commit 825a8ba77ad1766f998a71a9a15f21e73daad84a ## File path: core/src/test/scala/unit/kafka/security/authorizer/AuthorizerInterfaceDefaultTest.scala ## @@ -0,0 +1,164 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package kafka.security.authorizer + +import java.net.InetAddress + +import kafka.server.KafkaConfig +import kafka.utils.TestUtils +import kafka.zk.ZooKeeperTestHarness +import kafka.zookeeper.ZooKeeperClient +import org.apache.kafka.common.acl._ +import org.apache.kafka.common.network.{ClientInformation, ListenerName} +import org.apache.kafka.common.protocol.ApiKeys +import org.apache.kafka.common.requests.{RequestContext, RequestHeader} +import org.apache.kafka.common.resource.{ResourcePattern, ResourceType} +import org.apache.kafka.common.security.auth.{KafkaPrincipal, SecurityProtocol} +import org.apache.kafka.common.utils.Time +import org.apache.kafka.server.authorizer._ +import org.junit.{After, Before, Test} + +import scala.collection.mutable.ArrayBuffer +import scala.jdk.CollectionConverters._ + +class AuthorizerInterfaceDefaultTest extends ZooKeeperTestHarness { + + private val interfaceDefaultAuthorizer = new DelegateAuthorizer + private val superUsers = "User:superuser1; User:superuser2" + private val username = "alice" + private val principal = new KafkaPrincipal(KafkaPrincipal.USER_TYPE, username) + private val requestContext = newRequestContext(principal, InetAddress.getByName("192.168.0.1")) + private var config: KafkaConfig = _ + private var zooKeeperClient: ZooKeeperClient = _ + private val aclAdded: ArrayBuffer[(Authorizer, Set[AccessControlEntry], ResourcePattern)] = ArrayBuffer() + private val authorizerTestFactory = new AuthorizerTestFactory( +newRequestContext, addAcls, authorizeByResourceType, removeAcls) + + class CustomPrincipal(principalType: String, name: String) extends KafkaPrincipal(principalType, name) { +override def equals(o: scala.Any): Boolean = false + } + + @Before + override def setUp(): Unit = { +super.setUp() + +val authorizers = Seq(interfaceDefaultAuthorizer.authorizer) + +// Increase maxUpdateRetries to avoid transient failures +authorizers.foreach(a => a.maxUpdateRetries = Int.MaxValue) + +val props = TestUtils.createBrokerConfig(0, zkConnect) +props.put(AclAuthorizer.SuperUsersProp, superUsers) + +config = KafkaConfig.fromProps(props) +authorizers.foreach(a => a.configure(config.originals)) + +zooKeeperClient = new ZooKeeperClient(zkConnect, zkSessionTimeout, zkConnectionTimeout, zkMaxInFlightRequests, + Time.SYSTEM, "kafka.test", "AclAuthorizerTest") + } + + @After + override def tearDown(): Unit = { +val authorizers = Seq(interfaceDefaultAuthorizer) +authorizers.foreach(a => { + a.acls(AclBindingFilter.ANY).forEach(bd => { +removeAcls(interfaceDefaultAuthorizer, Set(bd.entry), bd.pattern()) Review comment: commit 825a8ba77ad1766f998a71a9a15f21e73daad84a
[GitHub] [kafka] d8tltanc commented on a change in pull request #9485: KAKFA-10619: Idempotent producer will get authorized once it has a WRITE access to at least one topic
d8tltanc commented on a change in pull request #9485: URL: https://github.com/apache/kafka/pull/9485#discussion_r543116084 ## File path: core/src/test/scala/unit/kafka/security/authorizer/AclAuthorizerTest.scala ## @@ -1040,19 +1116,24 @@ class AclAuthorizerTest extends ZooKeeperTestHarness { securityProtocol, ClientInformation.EMPTY, false) } - private def authorize(authorizer: AclAuthorizer, requestContext: RequestContext, operation: AclOperation, resource: ResourcePattern): Boolean = { + private def authorize(authorizer: Authorizer, requestContext: RequestContext, operation: AclOperation, resource: ResourcePattern): Boolean = { Review comment: For this method, I changed the signature back to AclAuthorzier as the AuthorizerTestFactory is not depending on it. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] d8tltanc commented on a change in pull request #9485: KAKFA-10619: Idempotent producer will get authorized once it has a WRITE access to at least one topic
d8tltanc commented on a change in pull request #9485: URL: https://github.com/apache/kafka/pull/9485#discussion_r543114091 ## File path: core/src/test/scala/unit/kafka/security/authorizer/AclAuthorizerTest.scala ## @@ -1040,19 +1116,24 @@ class AclAuthorizerTest extends ZooKeeperTestHarness { securityProtocol, ClientInformation.EMPTY, false) } - private def authorize(authorizer: AclAuthorizer, requestContext: RequestContext, operation: AclOperation, resource: ResourcePattern): Boolean = { + private def authorize(authorizer: Authorizer, requestContext: RequestContext, operation: AclOperation, resource: ResourcePattern): Boolean = { val action = new Action(operation, resource, 1, true, true) authorizer.authorize(requestContext, List(action).asJava).asScala.head == AuthorizationResult.ALLOWED } - private def addAcls(authorizer: AclAuthorizer, aces: Set[AccessControlEntry], resourcePattern: ResourcePattern): Unit = { + private def authorizeByResourceType(authorizer: Authorizer, requestContext: RequestContext, operation: AclOperation, resourceType: ResourceType) : Boolean = { +authorizer.authorizeByResourceType(requestContext, operation, resourceType) == AuthorizationResult.ALLOWED + } + + private def addAcls(authorizer: Authorizer, aces: Set[AccessControlEntry], resourcePattern: ResourcePattern): Unit = { Review comment: I replied here. Maybe I shouldn't have resolved it. https://github.com/apache/kafka/pull/9485#discussion_r540706117 Since AuthorizerInterfaceDefaultTest, AclAuthorizerTest, and AuthorizerWrapperTest are sharing some test utils, we need to make this method signature abstract a bit, in order to make it usable by AuthorizerTestFactory. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] d8tltanc commented on a change in pull request #9485: KAKFA-10619: Idempotent producer will get authorized once it has a WRITE access to at least one topic
d8tltanc commented on a change in pull request #9485: URL: https://github.com/apache/kafka/pull/9485#discussion_r543099934 ## File path: core/src/main/scala/kafka/security/authorizer/AuthorizerWrapper.scala ## @@ -175,4 +185,39 @@ class AuthorizerWrapper(private[kafka] val baseAuthorizer: kafka.security.auth.A override def close(): Unit = { baseAuthorizer.close() } + + override def authorizeByResourceType(requestContext: AuthorizableRequestContext, + op: AclOperation, + resourceType: ResourceType): AuthorizationResult = { +SecurityUtils.authorizeByResourceTypeCheckArgs(op, resourceType) + +if (denyAllResource(requestContext, op, resourceType)) { + AuthorizationResult.DENIED +} else if (shouldAllowEveryoneIfNoAclIsFound) { + AuthorizationResult.ALLOWED +} else { + super.authorizeByResourceType(requestContext, op, resourceType) +} + } + + private def denyAllResource(requestContext: AuthorizableRequestContext, + op: AclOperation, + resourceType: ResourceType): Boolean = { +val resourceTypeFilter = new ResourcePatternFilter( + resourceType, Resource.WildCardResource, PatternType.LITERAL) +val principal = new KafkaPrincipal(requestContext.principal.getPrincipalType, requestContext.principal.getName) +val host = requestContext.clientAddress().getHostAddress +val accessControlEntry = new AccessControlEntryFilter(null, null, op, AclPermissionType.DENY) +val aclFilter = new AclBindingFilter(resourceTypeFilter, accessControlEntry) + +acls(aclFilter).asScala.exists(b => principalHostMatch(b.entry(), principal, host)) + } + + private def principalHostMatch(ace: AccessControlEntry, + principal: KafkaPrincipal, + host: String): Boolean = { +((ace.host() == AclEntry.WildcardHost || ace.host() == host) + && (ace.principal() == AclEntry.WildcardPrincipalString || ace.principal() == principal.toString)) Review comment: Yeah. I was trying to restrict the type in order to remind people to construct a KafkaPrinciple first. But toString() is an expensive operation. commit 16576f85a858648cfc4ff882b554ddc65922021c This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] d8tltanc commented on a change in pull request #9485: KAKFA-10619: Idempotent producer will get authorized once it has a WRITE access to at least one topic
d8tltanc commented on a change in pull request #9485: URL: https://github.com/apache/kafka/pull/9485#discussion_r543093393 ## File path: core/src/main/scala/kafka/security/authorizer/AuthorizerWrapper.scala ## @@ -71,15 +74,22 @@ object AuthorizerWrapper { } def convertToResource(resourcePattern: ResourcePattern): Resource = { -Resource(ResourceType.fromJava(resourcePattern.resourceType), resourcePattern.name, resourcePattern.patternType) +Resource(ResourceTypeLegacy.fromJava(resourcePattern.resourceType), resourcePattern.name, resourcePattern.patternType) } } @deprecated("Use kafka.security.authorizer.AclAuthorizer", "Since 2.5") class AuthorizerWrapper(private[kafka] val baseAuthorizer: kafka.security.auth.Authorizer) extends Authorizer { + var shouldAllowEveryoneIfNoAclIsFound = false + override def configure(configs: util.Map[String, _]): Unit = { baseAuthorizer.configure(configs) +shouldAllowEveryoneIfNoAclIsFound = (configs.asScala.get( + AclAuthorizer.AllowEveryoneIfNoAclIsFoundProp).exists(_.toString.toBoolean) + && baseAuthorizer.authorize( +new Session(KafkaPrincipal.ANONYMOUS, InetAddress.getByName("1.2.3.4")), +Read, new Resource(Topic, "hi", PatternType.LITERAL))) Review comment: So we have three approaches here: 1. use .getClass 2. use .isInstanceOf 3. only configure the property with the key "AclAuthorizer.AllowEveryoneIfNoAclIsFoundProp" in the AuthorizerWrapper instance construction so no other property will get in. Neither of them is perfect but approach 2 also seems better to me. commit 1217394c0c3767ac11df958c02a681c8cbc8382b This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] d8tltanc commented on a change in pull request #9485: KAKFA-10619: Idempotent producer will get authorized once it has a WRITE access to at least one topic
d8tltanc commented on a change in pull request #9485: URL: https://github.com/apache/kafka/pull/9485#discussion_r543093393 ## File path: core/src/main/scala/kafka/security/authorizer/AuthorizerWrapper.scala ## @@ -71,15 +74,22 @@ object AuthorizerWrapper { } def convertToResource(resourcePattern: ResourcePattern): Resource = { -Resource(ResourceType.fromJava(resourcePattern.resourceType), resourcePattern.name, resourcePattern.patternType) +Resource(ResourceTypeLegacy.fromJava(resourcePattern.resourceType), resourcePattern.name, resourcePattern.patternType) } } @deprecated("Use kafka.security.authorizer.AclAuthorizer", "Since 2.5") class AuthorizerWrapper(private[kafka] val baseAuthorizer: kafka.security.auth.Authorizer) extends Authorizer { + var shouldAllowEveryoneIfNoAclIsFound = false + override def configure(configs: util.Map[String, _]): Unit = { baseAuthorizer.configure(configs) +shouldAllowEveryoneIfNoAclIsFound = (configs.asScala.get( + AclAuthorizer.AllowEveryoneIfNoAclIsFoundProp).exists(_.toString.toBoolean) + && baseAuthorizer.authorize( +new Session(KafkaPrincipal.ANONYMOUS, InetAddress.getByName("1.2.3.4")), +Read, new Resource(Topic, "hi", PatternType.LITERAL))) Review comment: So we have three approaches here: 1. use .getClass 2. use .isInstanceOf 3. only configure the property with the key "AclAuthorizer.AllowEveryoneIfNoAclIsFoundProp" in the AuthorizerWrapper instance construction so no other property will get constructed. Neither of them is perfect but approach 2 also seems better to me. commit 1217394c0c3767ac11df958c02a681c8cbc8382b This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] d8tltanc commented on a change in pull request #9485: KAKFA-10619: Idempotent producer will get authorized once it has a WRITE access to at least one topic
d8tltanc commented on a change in pull request #9485: URL: https://github.com/apache/kafka/pull/9485#discussion_r543093393 ## File path: core/src/main/scala/kafka/security/authorizer/AuthorizerWrapper.scala ## @@ -71,15 +74,22 @@ object AuthorizerWrapper { } def convertToResource(resourcePattern: ResourcePattern): Resource = { -Resource(ResourceType.fromJava(resourcePattern.resourceType), resourcePattern.name, resourcePattern.patternType) +Resource(ResourceTypeLegacy.fromJava(resourcePattern.resourceType), resourcePattern.name, resourcePattern.patternType) } } @deprecated("Use kafka.security.authorizer.AclAuthorizer", "Since 2.5") class AuthorizerWrapper(private[kafka] val baseAuthorizer: kafka.security.auth.Authorizer) extends Authorizer { + var shouldAllowEveryoneIfNoAclIsFound = false + override def configure(configs: util.Map[String, _]): Unit = { baseAuthorizer.configure(configs) +shouldAllowEveryoneIfNoAclIsFound = (configs.asScala.get( + AclAuthorizer.AllowEveryoneIfNoAclIsFoundProp).exists(_.toString.toBoolean) + && baseAuthorizer.authorize( +new Session(KafkaPrincipal.ANONYMOUS, InetAddress.getByName("1.2.3.4")), +Read, new Resource(Topic, "hi", PatternType.LITERAL))) Review comment: So we have three approaches here: 1. use .getClass 2. use .isInstanceOf 3. only configure the property with the key "AclAuthorizer.AllowEveryoneIfNoAclIsFoundProp" in the AuthorizerWrapper class instance object construction so no other property will get constructed. Neither of them is perfect but approach 2 also seems better to me. commit 1217394c0c3767ac11df958c02a681c8cbc8382b This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] d8tltanc commented on a change in pull request #9485: KAKFA-10619: Idempotent producer will get authorized once it has a WRITE access to at least one topic
d8tltanc commented on a change in pull request #9485: URL: https://github.com/apache/kafka/pull/9485#discussion_r543093393 ## File path: core/src/main/scala/kafka/security/authorizer/AuthorizerWrapper.scala ## @@ -71,15 +74,22 @@ object AuthorizerWrapper { } def convertToResource(resourcePattern: ResourcePattern): Resource = { -Resource(ResourceType.fromJava(resourcePattern.resourceType), resourcePattern.name, resourcePattern.patternType) +Resource(ResourceTypeLegacy.fromJava(resourcePattern.resourceType), resourcePattern.name, resourcePattern.patternType) } } @deprecated("Use kafka.security.authorizer.AclAuthorizer", "Since 2.5") class AuthorizerWrapper(private[kafka] val baseAuthorizer: kafka.security.auth.Authorizer) extends Authorizer { + var shouldAllowEveryoneIfNoAclIsFound = false + override def configure(configs: util.Map[String, _]): Unit = { baseAuthorizer.configure(configs) +shouldAllowEveryoneIfNoAclIsFound = (configs.asScala.get( + AclAuthorizer.AllowEveryoneIfNoAclIsFoundProp).exists(_.toString.toBoolean) + && baseAuthorizer.authorize( +new Session(KafkaPrincipal.ANONYMOUS, InetAddress.getByName("1.2.3.4")), +Read, new Resource(Topic, "hi", PatternType.LITERAL))) Review comment: So we have three approaches here: 1. use .getClass 2. use .isInstanceOf 3. only configure the property with the key "AclAuthorizer.AllowEveryoneIfNoAclIsFoundProp" in the class instance object construction so no other property will get constructed. Neither of them is perfect but approach 2 also seems better to me. commit 1217394c0c3767ac11df958c02a681c8cbc8382b This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] d8tltanc commented on a change in pull request #9485: KAKFA-10619: Idempotent producer will get authorized once it has a WRITE access to at least one topic
d8tltanc commented on a change in pull request #9485: URL: https://github.com/apache/kafka/pull/9485#discussion_r543093393 ## File path: core/src/main/scala/kafka/security/authorizer/AuthorizerWrapper.scala ## @@ -71,15 +74,22 @@ object AuthorizerWrapper { } def convertToResource(resourcePattern: ResourcePattern): Resource = { -Resource(ResourceType.fromJava(resourcePattern.resourceType), resourcePattern.name, resourcePattern.patternType) +Resource(ResourceTypeLegacy.fromJava(resourcePattern.resourceType), resourcePattern.name, resourcePattern.patternType) } } @deprecated("Use kafka.security.authorizer.AclAuthorizer", "Since 2.5") class AuthorizerWrapper(private[kafka] val baseAuthorizer: kafka.security.auth.Authorizer) extends Authorizer { + var shouldAllowEveryoneIfNoAclIsFound = false + override def configure(configs: util.Map[String, _]): Unit = { baseAuthorizer.configure(configs) +shouldAllowEveryoneIfNoAclIsFound = (configs.asScala.get( + AclAuthorizer.AllowEveryoneIfNoAclIsFoundProp).exists(_.toString.toBoolean) + && baseAuthorizer.authorize( +new Session(KafkaPrincipal.ANONYMOUS, InetAddress.getByName("1.2.3.4")), +Read, new Resource(Topic, "hi", PatternType.LITERAL))) Review comment: So we have three approaches here: 1. use .getClass 2. use .isInstanceOf 3. only configure the property with the key "AclAuthorizer.AllowEveryoneIfNoAclIsFoundProp" in the class object construction so no other property will get constructed. Neither of them is perfect but approach 2 also seems better to me. commit 1217394c0c3767ac11df958c02a681c8cbc8382b This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] d8tltanc commented on a change in pull request #9485: KAKFA-10619: Idempotent producer will get authorized once it has a WRITE access to at least one topic
d8tltanc commented on a change in pull request #9485: URL: https://github.com/apache/kafka/pull/9485#discussion_r543093393 ## File path: core/src/main/scala/kafka/security/authorizer/AuthorizerWrapper.scala ## @@ -71,15 +74,22 @@ object AuthorizerWrapper { } def convertToResource(resourcePattern: ResourcePattern): Resource = { -Resource(ResourceType.fromJava(resourcePattern.resourceType), resourcePattern.name, resourcePattern.patternType) +Resource(ResourceTypeLegacy.fromJava(resourcePattern.resourceType), resourcePattern.name, resourcePattern.patternType) } } @deprecated("Use kafka.security.authorizer.AclAuthorizer", "Since 2.5") class AuthorizerWrapper(private[kafka] val baseAuthorizer: kafka.security.auth.Authorizer) extends Authorizer { + var shouldAllowEveryoneIfNoAclIsFound = false + override def configure(configs: util.Map[String, _]): Unit = { baseAuthorizer.configure(configs) +shouldAllowEveryoneIfNoAclIsFound = (configs.asScala.get( + AclAuthorizer.AllowEveryoneIfNoAclIsFoundProp).exists(_.toString.toBoolean) + && baseAuthorizer.authorize( +new Session(KafkaPrincipal.ANONYMOUS, InetAddress.getByName("1.2.3.4")), +Read, new Resource(Topic, "hi", PatternType.LITERAL))) Review comment: So we have two approaches here: 1. use .getClass 2. use .isInstanceOf 3. only configure the property with the key "AclAuthorizer.AllowEveryoneIfNoAclIsFoundProp" in the class object construction so no other property will get constructed. Neither of them is perfect but approach 2 also seems better to me. commit 1217394c0c3767ac11df958c02a681c8cbc8382b This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] d8tltanc commented on a change in pull request #9485: KAKFA-10619: Idempotent producer will get authorized once it has a WRITE access to at least one topic
d8tltanc commented on a change in pull request #9485: URL: https://github.com/apache/kafka/pull/9485#discussion_r543093393 ## File path: core/src/main/scala/kafka/security/authorizer/AuthorizerWrapper.scala ## @@ -71,15 +74,22 @@ object AuthorizerWrapper { } def convertToResource(resourcePattern: ResourcePattern): Resource = { -Resource(ResourceType.fromJava(resourcePattern.resourceType), resourcePattern.name, resourcePattern.patternType) +Resource(ResourceTypeLegacy.fromJava(resourcePattern.resourceType), resourcePattern.name, resourcePattern.patternType) } } @deprecated("Use kafka.security.authorizer.AclAuthorizer", "Since 2.5") class AuthorizerWrapper(private[kafka] val baseAuthorizer: kafka.security.auth.Authorizer) extends Authorizer { + var shouldAllowEveryoneIfNoAclIsFound = false + override def configure(configs: util.Map[String, _]): Unit = { baseAuthorizer.configure(configs) +shouldAllowEveryoneIfNoAclIsFound = (configs.asScala.get( + AclAuthorizer.AllowEveryoneIfNoAclIsFoundProp).exists(_.toString.toBoolean) + && baseAuthorizer.authorize( +new Session(KafkaPrincipal.ANONYMOUS, InetAddress.getByName("1.2.3.4")), +Read, new Resource(Topic, "hi", PatternType.LITERAL))) Review comment: So we have two approaches here: 1. use .getClass 2. use .isInstanceOf Neither of them is perfect but approach 2 also seems better to me. commit 1217394c0c3767ac11df958c02a681c8cbc8382b This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] d8tltanc commented on a change in pull request #9485: KAKFA-10619: Idempotent producer will get authorized once it has a WRITE access to at least one topic
d8tltanc commented on a change in pull request #9485: URL: https://github.com/apache/kafka/pull/9485#discussion_r543087570 ## File path: core/src/main/scala/kafka/security/authorizer/AuthorizerWrapper.scala ## @@ -71,15 +74,22 @@ object AuthorizerWrapper { } def convertToResource(resourcePattern: ResourcePattern): Resource = { -Resource(ResourceType.fromJava(resourcePattern.resourceType), resourcePattern.name, resourcePattern.patternType) +Resource(ResourceTypeLegacy.fromJava(resourcePattern.resourceType), resourcePattern.name, resourcePattern.patternType) } } @deprecated("Use kafka.security.authorizer.AclAuthorizer", "Since 2.5") class AuthorizerWrapper(private[kafka] val baseAuthorizer: kafka.security.auth.Authorizer) extends Authorizer { + var shouldAllowEveryoneIfNoAclIsFound = false + override def configure(configs: util.Map[String, _]): Unit = { baseAuthorizer.configure(configs) +shouldAllowEveryoneIfNoAclIsFound = (configs.asScala.get( + AclAuthorizer.AllowEveryoneIfNoAclIsFoundProp).exists(_.toString.toBoolean) + && baseAuthorizer.authorize( +new Session(KafkaPrincipal.ANONYMOUS, InetAddress.getByName("1.2.3.4")), +Read, new Resource(Topic, "hi", PatternType.LITERAL))) Review comment: So we have two approaches here: 1. use baseAuthorizer.getClass == classOf[SimpleAclAuthorizer] 2. use baseAuthorizer.isInstanceOf[SimpleAclAuthorizer] They are neither perfect. Approach 2 also seems better to me. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] d8tltanc commented on a change in pull request #9485: KAKFA-10619: Idempotent producer will get authorized once it has a WRITE access to at least one topic
d8tltanc commented on a change in pull request #9485: URL: https://github.com/apache/kafka/pull/9485#discussion_r543087570 ## File path: core/src/main/scala/kafka/security/authorizer/AuthorizerWrapper.scala ## @@ -71,15 +74,22 @@ object AuthorizerWrapper { } def convertToResource(resourcePattern: ResourcePattern): Resource = { -Resource(ResourceType.fromJava(resourcePattern.resourceType), resourcePattern.name, resourcePattern.patternType) +Resource(ResourceTypeLegacy.fromJava(resourcePattern.resourceType), resourcePattern.name, resourcePattern.patternType) } } @deprecated("Use kafka.security.authorizer.AclAuthorizer", "Since 2.5") class AuthorizerWrapper(private[kafka] val baseAuthorizer: kafka.security.auth.Authorizer) extends Authorizer { + var shouldAllowEveryoneIfNoAclIsFound = false + override def configure(configs: util.Map[String, _]): Unit = { baseAuthorizer.configure(configs) +shouldAllowEveryoneIfNoAclIsFound = (configs.asScala.get( + AclAuthorizer.AllowEveryoneIfNoAclIsFoundProp).exists(_.toString.toBoolean) + && baseAuthorizer.authorize( +new Session(KafkaPrincipal.ANONYMOUS, InetAddress.getByName("1.2.3.4")), +Read, new Resource(Topic, "hi", PatternType.LITERAL))) Review comment: So we have two approaches here: 1. use baseAuthorizer.getClass == classOf[SimpleAclAuthorizer] 2. use baseAuthorizer.isInstanceOf[SimpleAclAuthorizer] They are neither perfect. Approach 2 also seems better to me. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] d8tltanc commented on a change in pull request #9485: KAKFA-10619: Idempotent producer will get authorized once it has a WRITE access to at least one topic
d8tltanc commented on a change in pull request #9485: URL: https://github.com/apache/kafka/pull/9485#discussion_r543087570 ## File path: core/src/main/scala/kafka/security/authorizer/AuthorizerWrapper.scala ## @@ -71,15 +74,22 @@ object AuthorizerWrapper { } def convertToResource(resourcePattern: ResourcePattern): Resource = { -Resource(ResourceType.fromJava(resourcePattern.resourceType), resourcePattern.name, resourcePattern.patternType) +Resource(ResourceTypeLegacy.fromJava(resourcePattern.resourceType), resourcePattern.name, resourcePattern.patternType) } } @deprecated("Use kafka.security.authorizer.AclAuthorizer", "Since 2.5") class AuthorizerWrapper(private[kafka] val baseAuthorizer: kafka.security.auth.Authorizer) extends Authorizer { + var shouldAllowEveryoneIfNoAclIsFound = false + override def configure(configs: util.Map[String, _]): Unit = { baseAuthorizer.configure(configs) +shouldAllowEveryoneIfNoAclIsFound = (configs.asScala.get( + AclAuthorizer.AllowEveryoneIfNoAclIsFoundProp).exists(_.toString.toBoolean) + && baseAuthorizer.authorize( +new Session(KafkaPrincipal.ANONYMOUS, InetAddress.getByName("1.2.3.4")), +Read, new Resource(Topic, "hi", PatternType.LITERAL))) Review comment: I think "isInstanceOf" will still return true if the `baseAuthorizer` is a subclass extending SimpleAclAuthorizer. class Father { } class Son extends Father { } val father = new Father val son = new Son println(son.isInstanceOf[Father]) println(son.getClass == classOf[Father]) println(son.getClass == classOf[Son]) > true >false >true This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] d8tltanc commented on a change in pull request #9485: KAKFA-10619: Idempotent producer will get authorized once it has a WRITE access to at least one topic
d8tltanc commented on a change in pull request #9485: URL: https://github.com/apache/kafka/pull/9485#discussion_r543087570 ## File path: core/src/main/scala/kafka/security/authorizer/AuthorizerWrapper.scala ## @@ -71,15 +74,22 @@ object AuthorizerWrapper { } def convertToResource(resourcePattern: ResourcePattern): Resource = { -Resource(ResourceType.fromJava(resourcePattern.resourceType), resourcePattern.name, resourcePattern.patternType) +Resource(ResourceTypeLegacy.fromJava(resourcePattern.resourceType), resourcePattern.name, resourcePattern.patternType) } } @deprecated("Use kafka.security.authorizer.AclAuthorizer", "Since 2.5") class AuthorizerWrapper(private[kafka] val baseAuthorizer: kafka.security.auth.Authorizer) extends Authorizer { + var shouldAllowEveryoneIfNoAclIsFound = false + override def configure(configs: util.Map[String, _]): Unit = { baseAuthorizer.configure(configs) +shouldAllowEveryoneIfNoAclIsFound = (configs.asScala.get( + AclAuthorizer.AllowEveryoneIfNoAclIsFoundProp).exists(_.toString.toBoolean) + && baseAuthorizer.authorize( +new Session(KafkaPrincipal.ANONYMOUS, InetAddress.getByName("1.2.3.4")), +Read, new Resource(Topic, "hi", PatternType.LITERAL))) Review comment: I think "isInstanceOf" will still return true if the `baseAuthorizer` is a subclass extending SimpleAclAuthorizer. class Father { } class Son extends Father { } val father = new Father val son = new Son println(son.isInstanceOf[Father]) println(son.getClass == classOf[Father]) println(son.getClass == classOf[Son]) >>> true >>>false >>>true This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] d8tltanc commented on a change in pull request #9485: KAKFA-10619: Idempotent producer will get authorized once it has a WRITE access to at least one topic
d8tltanc commented on a change in pull request #9485: URL: https://github.com/apache/kafka/pull/9485#discussion_r543080612 ## File path: clients/src/main/java/org/apache/kafka/server/authorizer/Authorizer.java ## @@ -139,4 +151,126 @@ * @return Iterator for ACL bindings, which may be populated lazily. */ Iterable acls(AclBindingFilter filter); + +/** + * Check if the caller is authorized to perform the given ACL operation on at least one + * resource of the given type. + * + * 1. Filter out all the resource pattern corresponding to the requestContext, AclOperation, + *and ResourceType + * 2. If wildcard deny exists, return deny directly + * 3. For any literal allowed resource, if there's no dominant literal denied resource, and + *no dominant prefixed denied resource, return allow + * 4. For any prefixed allowed resource, if there's no dominant denied resource, return allow + * 5. For any other cases, return deny Review comment: Sure. commit 25e0bfcc97f956ceb4254ab8c457fe5d8d250e82 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] d8tltanc commented on a change in pull request #9485: KAKFA-10619: Idempotent producer will get authorized once it has a WRITE access to at least one topic
d8tltanc commented on a change in pull request #9485: URL: https://github.com/apache/kafka/pull/9485#discussion_r540418386 ## File path: core/src/main/scala/kafka/security/authorizer/AuthorizerWrapper.scala ## @@ -71,15 +73,19 @@ object AuthorizerWrapper { } def convertToResource(resourcePattern: ResourcePattern): Resource = { -Resource(ResourceType.fromJava(resourcePattern.resourceType), resourcePattern.name, resourcePattern.patternType) +Resource(ResourceTypeLegacy.fromJava(resourcePattern.resourceType), resourcePattern.name, resourcePattern.patternType) } } @deprecated("Use kafka.security.authorizer.AclAuthorizer", "Since 2.5") class AuthorizerWrapper(private[kafka] val baseAuthorizer: kafka.security.auth.Authorizer) extends Authorizer { + var shouldAllowEveryoneIfNoAclIsFound = false + override def configure(configs: util.Map[String, _]): Unit = { baseAuthorizer.configure(configs) +shouldAllowEveryoneIfNoAclIsFound = configs.asScala.get( + AclAuthorizer.AllowEveryoneIfNoAclIsFoundProp).exists(_.toString.toBoolean) Review comment: Given that we probably don't want to change the deprecated Authorizer interface, I can only think of one way to achieve this: Besides checking if the `AllowEveryoneIfNoAclIsFoundProp` exists and if it equals to `true`, I added another check to authorize on a hardcoded session, operation, and resource. Since configure() will be called immediately after the authorizer instantiation, it's guaranteed that no ACLs would exist when we do this check. override def configure(configs: util.Map[String, _]): Unit = { ..baseAuthorizer.configure(configs) shouldAllowEveryoneIfNoAclIsFound = (configs.asScala.get( ..AclAuthorizer.AllowEveryoneIfNoAclIsFoundProp).exists(_.toString.toBoolean) && baseAuthorizer.authorize( ..new Session(KafkaPrincipal.ANONYMOUS, InetAddress.getByName("1.2.3.4")), Read, new Resource(Topic, "hi", PatternType.LITERAL))) } commit 2ed79a0a7788f8841475badfd1c26adf0eb3435c This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] d8tltanc commented on a change in pull request #9485: KAKFA-10619: Idempotent producer will get authorized once it has a WRITE access to at least one topic
d8tltanc commented on a change in pull request #9485: URL: https://github.com/apache/kafka/pull/9485#discussion_r542039322 ## File path: jmh-benchmarks/src/main/java/org/apache/kafka/jmh/acl/AclAuthorizerBenchmark.java ## @@ -105,49 +120,76 @@ private void setFieldValue(Object obj, String fieldName, Object value) throws Ex field.set(obj, value); } -private TreeMap prepareAclCache() { +private void prepareAclCache() throws UnknownHostException { Map> aclEntries = new HashMap<>(); for (int resourceId = 0; resourceId < resourceCount; resourceId++) { ResourcePattern resource = new ResourcePattern( (resourceId % 10 == 0) ? ResourceType.GROUP : ResourceType.TOPIC, -resourceNamePrefix + resourceId, +resourceName(resourceNamePrefix), Review comment: Benchmark result: https://paste.ubuntu.com/p/zvjZC4QkMM/ Performance pattern doesn't change, except `testUpdateCache` runs much faster now. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] d8tltanc commented on a change in pull request #9485: KAKFA-10619: Idempotent producer will get authorized once it has a WRITE access to at least one topic
d8tltanc commented on a change in pull request #9485: URL: https://github.com/apache/kafka/pull/9485#discussion_r542039322 ## File path: jmh-benchmarks/src/main/java/org/apache/kafka/jmh/acl/AclAuthorizerBenchmark.java ## @@ -105,49 +120,76 @@ private void setFieldValue(Object obj, String fieldName, Object value) throws Ex field.set(obj, value); } -private TreeMap prepareAclCache() { +private void prepareAclCache() throws UnknownHostException { Map> aclEntries = new HashMap<>(); for (int resourceId = 0; resourceId < resourceCount; resourceId++) { ResourcePattern resource = new ResourcePattern( (resourceId % 10 == 0) ? ResourceType.GROUP : ResourceType.TOPIC, -resourceNamePrefix + resourceId, +resourceName(resourceNamePrefix), Review comment: Benchmark result: https://paste.ubuntu.com/p/zvjZC4QkMM/ This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] d8tltanc commented on a change in pull request #9485: KAKFA-10619: Idempotent producer will get authorized once it has a WRITE access to at least one topic
d8tltanc commented on a change in pull request #9485: URL: https://github.com/apache/kafka/pull/9485#discussion_r539782093 ## File path: core/src/main/scala/kafka/security/authorizer/AclAuthorizer.scala ## @@ -304,6 +308,105 @@ class AclAuthorizer extends Authorizer with Logging { if (zkClient != null) zkClient.close() } + // TODO: 1. Discuss how to log audit message + // TODO: 2. Discuss if we need a trie to optimize(mainly for the O(n^2) loop but I think + // in most of the cases it would be O(1) because denyDominatePrefixAllow should be rare + override def authorizeByResourceType(requestContext: AuthorizableRequestContext, + op: AclOperation, + resourceType: ResourceType): AuthorizationResult = { +SecurityUtils.authorizeByResourceTypeCheckArgs(op, resourceType) + +val principal = new KafkaPrincipal( + requestContext.principal().getPrincipalType, + requestContext.principal().getName).toString +val host = requestContext.clientAddress().getHostAddress +val action = new Action(op, new ResourcePattern(resourceType, "NONE", PatternType.UNKNOWN), 0, true, true) + +val denyLiterals = matchingResources( + principal, host, op, AclPermissionType.DENY, resourceType, PatternType.LITERAL) + +if (denyAll(denyLiterals)) { + logAuditMessage(requestContext, action, false) + return AuthorizationResult.DENIED +} + +if (shouldAllowEveryoneIfNoAclIsFound) { + logAuditMessage(requestContext, action, true) + return AuthorizationResult.ALLOWED +} + +val allowLiterals = matchingResources( + principal, host, op, AclPermissionType.ALLOW, resourceType, PatternType.LITERAL) +val allowPrefixes = matchingResources( + principal, host, op, AclPermissionType.ALLOW, resourceType, PatternType.PREFIXED) +val denyPrefixes = matchingResources( + principal, host, op, AclPermissionType.DENY, resourceType, PatternType.PREFIXED) + +if (allowAny(allowLiterals, allowPrefixes, denyLiterals, denyPrefixes)) { + logAuditMessage(requestContext, action, true) + return AuthorizationResult.ALLOWED +} + +logAuditMessage(requestContext, action, false) +AuthorizationResult.DENIED + } + + def matchingResources(principal: String, host: String, op: AclOperation, permission: AclPermissionType, +resourceType: ResourceType, patternType: PatternType): List[mutable.HashSet[String]] = { Review comment: Because we don't wanna reconstruct a new large set containing all the matching resources. We are constructing a List of ~ 3 * 3 * 3 elements which refer to existing HashSets maintained by `updateCache`. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] d8tltanc commented on a change in pull request #9485: KAKFA-10619: Idempotent producer will get authorized once it has a WRITE access to at least one topic
d8tltanc commented on a change in pull request #9485: URL: https://github.com/apache/kafka/pull/9485#discussion_r539782093 ## File path: core/src/main/scala/kafka/security/authorizer/AclAuthorizer.scala ## @@ -304,6 +308,105 @@ class AclAuthorizer extends Authorizer with Logging { if (zkClient != null) zkClient.close() } + // TODO: 1. Discuss how to log audit message + // TODO: 2. Discuss if we need a trie to optimize(mainly for the O(n^2) loop but I think + // in most of the cases it would be O(1) because denyDominatePrefixAllow should be rare + override def authorizeByResourceType(requestContext: AuthorizableRequestContext, + op: AclOperation, + resourceType: ResourceType): AuthorizationResult = { +SecurityUtils.authorizeByResourceTypeCheckArgs(op, resourceType) + +val principal = new KafkaPrincipal( + requestContext.principal().getPrincipalType, + requestContext.principal().getName).toString +val host = requestContext.clientAddress().getHostAddress +val action = new Action(op, new ResourcePattern(resourceType, "NONE", PatternType.UNKNOWN), 0, true, true) + +val denyLiterals = matchingResources( + principal, host, op, AclPermissionType.DENY, resourceType, PatternType.LITERAL) + +if (denyAll(denyLiterals)) { + logAuditMessage(requestContext, action, false) + return AuthorizationResult.DENIED +} + +if (shouldAllowEveryoneIfNoAclIsFound) { + logAuditMessage(requestContext, action, true) + return AuthorizationResult.ALLOWED +} + +val allowLiterals = matchingResources( + principal, host, op, AclPermissionType.ALLOW, resourceType, PatternType.LITERAL) +val allowPrefixes = matchingResources( + principal, host, op, AclPermissionType.ALLOW, resourceType, PatternType.PREFIXED) +val denyPrefixes = matchingResources( + principal, host, op, AclPermissionType.DENY, resourceType, PatternType.PREFIXED) + +if (allowAny(allowLiterals, allowPrefixes, denyLiterals, denyPrefixes)) { + logAuditMessage(requestContext, action, true) + return AuthorizationResult.ALLOWED +} + +logAuditMessage(requestContext, action, false) +AuthorizationResult.DENIED + } + + def matchingResources(principal: String, host: String, op: AclOperation, permission: AclPermissionType, +resourceType: ResourceType, patternType: PatternType): List[mutable.HashSet[String]] = { Review comment: Because we don't wanna reconstruct a new large set containing all the matching resources. We are constructing a List of ~ 3 * 3 * 3 elements which refer to existing HashSets maintaining by `updateCache`. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] d8tltanc commented on a change in pull request #9485: KAKFA-10619: Idempotent producer will get authorized once it has a WRITE access to at least one topic
d8tltanc commented on a change in pull request #9485: URL: https://github.com/apache/kafka/pull/9485#discussion_r539782093 ## File path: core/src/main/scala/kafka/security/authorizer/AclAuthorizer.scala ## @@ -304,6 +308,105 @@ class AclAuthorizer extends Authorizer with Logging { if (zkClient != null) zkClient.close() } + // TODO: 1. Discuss how to log audit message + // TODO: 2. Discuss if we need a trie to optimize(mainly for the O(n^2) loop but I think + // in most of the cases it would be O(1) because denyDominatePrefixAllow should be rare + override def authorizeByResourceType(requestContext: AuthorizableRequestContext, + op: AclOperation, + resourceType: ResourceType): AuthorizationResult = { +SecurityUtils.authorizeByResourceTypeCheckArgs(op, resourceType) + +val principal = new KafkaPrincipal( + requestContext.principal().getPrincipalType, + requestContext.principal().getName).toString +val host = requestContext.clientAddress().getHostAddress +val action = new Action(op, new ResourcePattern(resourceType, "NONE", PatternType.UNKNOWN), 0, true, true) + +val denyLiterals = matchingResources( + principal, host, op, AclPermissionType.DENY, resourceType, PatternType.LITERAL) + +if (denyAll(denyLiterals)) { + logAuditMessage(requestContext, action, false) + return AuthorizationResult.DENIED +} + +if (shouldAllowEveryoneIfNoAclIsFound) { + logAuditMessage(requestContext, action, true) + return AuthorizationResult.ALLOWED +} + +val allowLiterals = matchingResources( + principal, host, op, AclPermissionType.ALLOW, resourceType, PatternType.LITERAL) +val allowPrefixes = matchingResources( + principal, host, op, AclPermissionType.ALLOW, resourceType, PatternType.PREFIXED) +val denyPrefixes = matchingResources( + principal, host, op, AclPermissionType.DENY, resourceType, PatternType.PREFIXED) + +if (allowAny(allowLiterals, allowPrefixes, denyLiterals, denyPrefixes)) { + logAuditMessage(requestContext, action, true) + return AuthorizationResult.ALLOWED +} + +logAuditMessage(requestContext, action, false) +AuthorizationResult.DENIED + } + + def matchingResources(principal: String, host: String, op: AclOperation, permission: AclPermissionType, +resourceType: ResourceType, patternType: PatternType): List[mutable.HashSet[String]] = { Review comment: Because we don't wanna reconstruct a new large set containing all the matching resources. We are constructing a List of ~ 3 * 3 * 3 elements, which refers to existing HashSets maintaining by `updateCache`. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] d8tltanc commented on a change in pull request #9485: KAKFA-10619: Idempotent producer will get authorized once it has a WRITE access to at least one topic
d8tltanc commented on a change in pull request #9485: URL: https://github.com/apache/kafka/pull/9485#discussion_r539782093 ## File path: core/src/main/scala/kafka/security/authorizer/AclAuthorizer.scala ## @@ -304,6 +308,105 @@ class AclAuthorizer extends Authorizer with Logging { if (zkClient != null) zkClient.close() } + // TODO: 1. Discuss how to log audit message + // TODO: 2. Discuss if we need a trie to optimize(mainly for the O(n^2) loop but I think + // in most of the cases it would be O(1) because denyDominatePrefixAllow should be rare + override def authorizeByResourceType(requestContext: AuthorizableRequestContext, + op: AclOperation, + resourceType: ResourceType): AuthorizationResult = { +SecurityUtils.authorizeByResourceTypeCheckArgs(op, resourceType) + +val principal = new KafkaPrincipal( + requestContext.principal().getPrincipalType, + requestContext.principal().getName).toString +val host = requestContext.clientAddress().getHostAddress +val action = new Action(op, new ResourcePattern(resourceType, "NONE", PatternType.UNKNOWN), 0, true, true) + +val denyLiterals = matchingResources( + principal, host, op, AclPermissionType.DENY, resourceType, PatternType.LITERAL) + +if (denyAll(denyLiterals)) { + logAuditMessage(requestContext, action, false) + return AuthorizationResult.DENIED +} + +if (shouldAllowEveryoneIfNoAclIsFound) { + logAuditMessage(requestContext, action, true) + return AuthorizationResult.ALLOWED +} + +val allowLiterals = matchingResources( + principal, host, op, AclPermissionType.ALLOW, resourceType, PatternType.LITERAL) +val allowPrefixes = matchingResources( + principal, host, op, AclPermissionType.ALLOW, resourceType, PatternType.PREFIXED) +val denyPrefixes = matchingResources( + principal, host, op, AclPermissionType.DENY, resourceType, PatternType.PREFIXED) + +if (allowAny(allowLiterals, allowPrefixes, denyLiterals, denyPrefixes)) { + logAuditMessage(requestContext, action, true) + return AuthorizationResult.ALLOWED +} + +logAuditMessage(requestContext, action, false) +AuthorizationResult.DENIED + } + + def matchingResources(principal: String, host: String, op: AclOperation, permission: AclPermissionType, +resourceType: ResourceType, patternType: PatternType): List[mutable.HashSet[String]] = { Review comment: Because we won't reconstruct a new large set containing all the matching resources. We are constructing a List of ~ 3 * 3 * 3 elements, which refers to existing HashSets maintaining by `updateCache`. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] d8tltanc commented on a change in pull request #9485: KAKFA-10619: Idempotent producer will get authorized once it has a WRITE access to at least one topic
d8tltanc commented on a change in pull request #9485: URL: https://github.com/apache/kafka/pull/9485#discussion_r541779975 ## File path: core/src/test/scala/unit/kafka/security/authorizer/AclAuthorizerTest.scala ## @@ -1040,19 +1117,24 @@ class AclAuthorizerTest extends ZooKeeperTestHarness { securityProtocol, ClientInformation.EMPTY, false) } - private def authorize(authorizer: AclAuthorizer, requestContext: RequestContext, operation: AclOperation, resource: ResourcePattern): Boolean = { + private def authorize(authorizer: Authorizer, requestContext: RequestContext, operation: AclOperation, resource: ResourcePattern): Boolean = { val action = new Action(operation, resource, 1, true, true) authorizer.authorize(requestContext, List(action).asJava).asScala.head == AuthorizationResult.ALLOWED } - private def addAcls(authorizer: AclAuthorizer, aces: Set[AccessControlEntry], resourcePattern: ResourcePattern): Unit = { + private def authorizeByResourceType(authorizer: Authorizer, requestContext: RequestContext, operation: AclOperation, resourceType: ResourceType) : Boolean = { +authorizer.authorizeByResourceType(requestContext, operation, resourceType) == AuthorizationResult.ALLOWED + } + + private def addAcls(authorizer: Authorizer, aces: Set[AccessControlEntry], resourcePattern: ResourcePattern): Unit = { val bindings = aces.map { ace => new AclBinding(resourcePattern, ace) } authorizer.createAcls(requestContext, bindings.toList.asJava).asScala .map(_.toCompletableFuture.get) .foreach { result => result.exception.ifPresent { e => throw e } } +aclAdded += Tuple3(authorizer, aces, resourcePattern) } - private def removeAcls(authorizer: AclAuthorizer, aces: Set[AccessControlEntry], resourcePattern: ResourcePattern): Boolean = { + private def removeAcls(authorizer: Authorizer, aces: Set[AccessControlEntry], resourcePattern: ResourcePattern): Boolean = { Review comment: Reverted other test changes commit 4f9b79a810c4da3030fe262d4bfdc97df4945e8c This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] d8tltanc commented on a change in pull request #9485: KAKFA-10619: Idempotent producer will get authorized once it has a WRITE access to at least one topic
d8tltanc commented on a change in pull request #9485: URL: https://github.com/apache/kafka/pull/9485#discussion_r539787542 ## File path: core/src/main/scala/kafka/security/authorizer/AclAuthorizer.scala ## @@ -304,6 +308,105 @@ class AclAuthorizer extends Authorizer with Logging { if (zkClient != null) zkClient.close() } + // TODO: 1. Discuss how to log audit message + // TODO: 2. Discuss if we need a trie to optimize(mainly for the O(n^2) loop but I think + // in most of the cases it would be O(1) because denyDominatePrefixAllow should be rare + override def authorizeByResourceType(requestContext: AuthorizableRequestContext, + op: AclOperation, + resourceType: ResourceType): AuthorizationResult = { +SecurityUtils.authorizeByResourceTypeCheckArgs(op, resourceType) + +val principal = new KafkaPrincipal( + requestContext.principal().getPrincipalType, + requestContext.principal().getName).toString +val host = requestContext.clientAddress().getHostAddress +val action = new Action(op, new ResourcePattern(resourceType, "NONE", PatternType.UNKNOWN), 0, true, true) + +val denyLiterals = matchingResources( + principal, host, op, AclPermissionType.DENY, resourceType, PatternType.LITERAL) + +if (denyAll(denyLiterals)) { + logAuditMessage(requestContext, action, false) + return AuthorizationResult.DENIED +} + +if (shouldAllowEveryoneIfNoAclIsFound) { + logAuditMessage(requestContext, action, true) + return AuthorizationResult.ALLOWED +} + +val allowLiterals = matchingResources( + principal, host, op, AclPermissionType.ALLOW, resourceType, PatternType.LITERAL) +val allowPrefixes = matchingResources( + principal, host, op, AclPermissionType.ALLOW, resourceType, PatternType.PREFIXED) +val denyPrefixes = matchingResources( + principal, host, op, AclPermissionType.DENY, resourceType, PatternType.PREFIXED) + +if (allowAny(allowLiterals, allowPrefixes, denyLiterals, denyPrefixes)) { + logAuditMessage(requestContext, action, true) + return AuthorizationResult.ALLOWED +} + +logAuditMessage(requestContext, action, false) +AuthorizationResult.DENIED + } + + def matchingResources(principal: String, host: String, op: AclOperation, permission: AclPermissionType, +resourceType: ResourceType, patternType: PatternType): List[mutable.HashSet[String]] = { +var matched = List[mutable.HashSet[String]]() +for (p <- Set(principal, AclEntry.WildcardPrincipal.toString)) { + for (h <- Set(host, AclEntry.WildcardHost)) { +for (o <- Set(op, AclOperation.ALL)) { + val ace = new AccessControlEntry(p, h, o, permission) + val resourceIndex = new ResourceIndex(ace, resourceType, patternType) + resourceCache.get(resourceIndex) match { +case Some(resources) => matched = matched :+ resources +case None => + } +} + } +} +matched + } + + def denyAll(denyLiterals: List[mutable.HashSet[String]]): Boolean = +denyLiterals.exists(r => r.contains(ResourcePattern.WILDCARD_RESOURCE)) + + + private def allowAny(allowLiterals: List[mutable.Set[String]], allowPrefixes: List[mutable.Set[String]], + denyLiterals: List[mutable.Set[String]], denyPrefixes: List[mutable.Set[String]]): Boolean = { +(allowPrefixes.exists(prefixes => + prefixes.exists(prefix => allowPrefix(prefix, denyPrefixes))) + || allowLiterals.exists(literals => +literals.exists(literal => allowLiteral(literal, denyLiterals, denyPrefixes + } + + private def allowLiteral(literalName: String, + denyLiterals: List[mutable.Set[String]], denyPrefixes: List[mutable.Set[String]]): Boolean = { +literalName match{ + case ResourcePattern.WILDCARD_RESOURCE => true + case _ => (denyLiterals.forall(denyLiterals => !denyLiterals.contains(literalName)) +&& !hasDominantPrefixedDeny(literalName, denyPrefixes)) +} + } + + private def allowPrefix(prefixName: String, + denyPrefixes: List[mutable.Set[String]]): Boolean = { +!hasDominantPrefixedDeny(prefixName, denyPrefixes) + } + + private def hasDominantPrefixedDeny(resourceName: String, denyPrefixes: List[mutable.Set[String]]): Boolean = { Review comment: I was trying to share it but it seems like the different collection type btw java and scala is a headache. We'll then need some java converters or instantiate a java collection in the scala code. Do you think it deserves this? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this
[GitHub] [kafka] d8tltanc commented on a change in pull request #9485: KAKFA-10619: Idempotent producer will get authorized once it has a WRITE access to at least one topic
d8tltanc commented on a change in pull request #9485: URL: https://github.com/apache/kafka/pull/9485#discussion_r539782093 ## File path: core/src/main/scala/kafka/security/authorizer/AclAuthorizer.scala ## @@ -304,6 +308,105 @@ class AclAuthorizer extends Authorizer with Logging { if (zkClient != null) zkClient.close() } + // TODO: 1. Discuss how to log audit message + // TODO: 2. Discuss if we need a trie to optimize(mainly for the O(n^2) loop but I think + // in most of the cases it would be O(1) because denyDominatePrefixAllow should be rare + override def authorizeByResourceType(requestContext: AuthorizableRequestContext, + op: AclOperation, + resourceType: ResourceType): AuthorizationResult = { +SecurityUtils.authorizeByResourceTypeCheckArgs(op, resourceType) + +val principal = new KafkaPrincipal( + requestContext.principal().getPrincipalType, + requestContext.principal().getName).toString +val host = requestContext.clientAddress().getHostAddress +val action = new Action(op, new ResourcePattern(resourceType, "NONE", PatternType.UNKNOWN), 0, true, true) + +val denyLiterals = matchingResources( + principal, host, op, AclPermissionType.DENY, resourceType, PatternType.LITERAL) + +if (denyAll(denyLiterals)) { + logAuditMessage(requestContext, action, false) + return AuthorizationResult.DENIED +} + +if (shouldAllowEveryoneIfNoAclIsFound) { + logAuditMessage(requestContext, action, true) + return AuthorizationResult.ALLOWED +} + +val allowLiterals = matchingResources( + principal, host, op, AclPermissionType.ALLOW, resourceType, PatternType.LITERAL) +val allowPrefixes = matchingResources( + principal, host, op, AclPermissionType.ALLOW, resourceType, PatternType.PREFIXED) +val denyPrefixes = matchingResources( + principal, host, op, AclPermissionType.DENY, resourceType, PatternType.PREFIXED) + +if (allowAny(allowLiterals, allowPrefixes, denyLiterals, denyPrefixes)) { + logAuditMessage(requestContext, action, true) + return AuthorizationResult.ALLOWED +} + +logAuditMessage(requestContext, action, false) +AuthorizationResult.DENIED + } + + def matchingResources(principal: String, host: String, op: AclOperation, permission: AclPermissionType, +resourceType: ResourceType, patternType: PatternType): List[mutable.HashSet[String]] = { Review comment: I think that we'd want to prevent constructing extremely large hash sets and keep the size in a threshold < 100k, in order to prevent the long wait for resizing. What do you think? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] d8tltanc commented on a change in pull request #9485: KAKFA-10619: Idempotent producer will get authorized once it has a WRITE access to at least one topic
d8tltanc commented on a change in pull request #9485: URL: https://github.com/apache/kafka/pull/9485#discussion_r541212190 ## File path: jmh-benchmarks/src/main/java/org/apache/kafka/jmh/acl/AclAuthorizerBenchmark.java ## @@ -105,49 +120,76 @@ private void setFieldValue(Object obj, String fieldName, Object value) throws Ex field.set(obj, value); } -private TreeMap prepareAclCache() { +private void prepareAclCache() throws UnknownHostException { Map> aclEntries = new HashMap<>(); for (int resourceId = 0; resourceId < resourceCount; resourceId++) { ResourcePattern resource = new ResourcePattern( (resourceId % 10 == 0) ? ResourceType.GROUP : ResourceType.TOPIC, -resourceNamePrefix + resourceId, +resourceName(resourceNamePrefix), Review comment: The existing benchmark does not have any DENY resource in it. Adding some DENY bindings whose percentage is controlled by parameters will be an improvement to the existing benchmark and help us understand the performance better. I've reverted all changes other than adding some DENY bindings. Also, I moved those memory intense operations into the @setup phase so now the benchmark just measures updateCache(). Does the benchmark look good to you now? commit 6536cea788210860a764f3f0a6901244e8d974fe @rajinisivaram This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] d8tltanc commented on a change in pull request #9485: KAKFA-10619: Idempotent producer will get authorized once it has a WRITE access to at least one topic
d8tltanc commented on a change in pull request #9485: URL: https://github.com/apache/kafka/pull/9485#discussion_r541212190 ## File path: jmh-benchmarks/src/main/java/org/apache/kafka/jmh/acl/AclAuthorizerBenchmark.java ## @@ -105,49 +120,76 @@ private void setFieldValue(Object obj, String fieldName, Object value) throws Ex field.set(obj, value); } -private TreeMap prepareAclCache() { +private void prepareAclCache() throws UnknownHostException { Map> aclEntries = new HashMap<>(); for (int resourceId = 0; resourceId < resourceCount; resourceId++) { ResourcePattern resource = new ResourcePattern( (resourceId % 10 == 0) ? ResourceType.GROUP : ResourceType.TOPIC, -resourceNamePrefix + resourceId, +resourceName(resourceNamePrefix), Review comment: The existing benchmark does not have any DENY resource in it. Adding some DENY bindings whose percentage is controlled by parameters will be an improvement to the existing benchmark and help us understand the performance better. I've reverted all changes other than adding some DENY bindings. Does the benchmark look good to you now? commit 6536cea788210860a764f3f0a6901244e8d974fe @rajinisivaram This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] d8tltanc commented on a change in pull request #9485: KAKFA-10619: Idempotent producer will get authorized once it has a WRITE access to at least one topic
d8tltanc commented on a change in pull request #9485: URL: https://github.com/apache/kafka/pull/9485#discussion_r541212190 ## File path: jmh-benchmarks/src/main/java/org/apache/kafka/jmh/acl/AclAuthorizerBenchmark.java ## @@ -105,49 +120,76 @@ private void setFieldValue(Object obj, String fieldName, Object value) throws Ex field.set(obj, value); } -private TreeMap prepareAclCache() { +private void prepareAclCache() throws UnknownHostException { Map> aclEntries = new HashMap<>(); for (int resourceId = 0; resourceId < resourceCount; resourceId++) { ResourcePattern resource = new ResourcePattern( (resourceId % 10 == 0) ? ResourceType.GROUP : ResourceType.TOPIC, -resourceNamePrefix + resourceId, +resourceName(resourceNamePrefix), Review comment: The existing benchmark does not have any DENY resource in it. Adding some DENY bindings whose percentage is controlled by parameters will be an improvement to the existing benchmark and help us understand the performance better. I've reverted all changes other than adding some DENY bindings. Does the benchmark look good to you now? @rajinisivaram This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] d8tltanc commented on a change in pull request #9485: KAKFA-10619: Idempotent producer will get authorized once it has a WRITE access to at least one topic
d8tltanc commented on a change in pull request #9485: URL: https://github.com/apache/kafka/pull/9485#discussion_r541212190 ## File path: jmh-benchmarks/src/main/java/org/apache/kafka/jmh/acl/AclAuthorizerBenchmark.java ## @@ -105,49 +120,76 @@ private void setFieldValue(Object obj, String fieldName, Object value) throws Ex field.set(obj, value); } -private TreeMap prepareAclCache() { +private void prepareAclCache() throws UnknownHostException { Map> aclEntries = new HashMap<>(); for (int resourceId = 0; resourceId < resourceCount; resourceId++) { ResourcePattern resource = new ResourcePattern( (resourceId % 10 == 0) ? ResourceType.GROUP : ResourceType.TOPIC, -resourceNamePrefix + resourceId, +resourceName(resourceNamePrefix), Review comment: The existing benchmark does not have any DENY resource in it. Adding some DENY resources whose percentage is controlled by parameters will be an improvement to the existing benchmark and help us understand the performance better. I've reverted all changes other than adding some DENY bindings. Does the benchmark look good to you now? @rajinisivaram This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] d8tltanc commented on a change in pull request #9485: KAKFA-10619: Idempotent producer will get authorized once it has a WRITE access to at least one topic
d8tltanc commented on a change in pull request #9485: URL: https://github.com/apache/kafka/pull/9485#discussion_r541212190 ## File path: jmh-benchmarks/src/main/java/org/apache/kafka/jmh/acl/AclAuthorizerBenchmark.java ## @@ -105,49 +120,76 @@ private void setFieldValue(Object obj, String fieldName, Object value) throws Ex field.set(obj, value); } -private TreeMap prepareAclCache() { +private void prepareAclCache() throws UnknownHostException { Map> aclEntries = new HashMap<>(); for (int resourceId = 0; resourceId < resourceCount; resourceId++) { ResourcePattern resource = new ResourcePattern( (resourceId % 10 == 0) ? ResourceType.GROUP : ResourceType.TOPIC, -resourceNamePrefix + resourceId, +resourceName(resourceNamePrefix), Review comment: The existing benchmark does not have any DENY resource in it. Adding some DENY resources whose percentage is controlled by parameters will be an improvement to the existing benchmark and help us understand the performance better. I've reverted all changes other than adding some DENY resource. Does the benchmark look good to you now? @rajinisivaram This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] d8tltanc commented on a change in pull request #9485: KAKFA-10619: Idempotent producer will get authorized once it has a WRITE access to at least one topic
d8tltanc commented on a change in pull request #9485: URL: https://github.com/apache/kafka/pull/9485#discussion_r541212190 ## File path: jmh-benchmarks/src/main/java/org/apache/kafka/jmh/acl/AclAuthorizerBenchmark.java ## @@ -105,49 +120,76 @@ private void setFieldValue(Object obj, String fieldName, Object value) throws Ex field.set(obj, value); } -private TreeMap prepareAclCache() { +private void prepareAclCache() throws UnknownHostException { Map> aclEntries = new HashMap<>(); for (int resourceId = 0; resourceId < resourceCount; resourceId++) { ResourcePattern resource = new ResourcePattern( (resourceId % 10 == 0) ? ResourceType.GROUP : ResourceType.TOPIC, -resourceNamePrefix + resourceId, +resourceName(resourceNamePrefix), Review comment: For this line, since the benchmark is creating the random resource name in several places, I'm trying to build a unified way to build it. The existing benchmark does not have any DENY resource in it. Adding some DENY resources whose percentage is controlled by parameters will be an improvement to the existing benchmark and help us understand the performance better. In addition, I think adding different types of patterns would also help. What do you think? @rajinisivaram This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] d8tltanc commented on a change in pull request #9485: KAKFA-10619: Idempotent producer will get authorized once it has a WRITE access to at least one topic
d8tltanc commented on a change in pull request #9485: URL: https://github.com/apache/kafka/pull/9485#discussion_r541212190 ## File path: jmh-benchmarks/src/main/java/org/apache/kafka/jmh/acl/AclAuthorizerBenchmark.java ## @@ -105,49 +120,76 @@ private void setFieldValue(Object obj, String fieldName, Object value) throws Ex field.set(obj, value); } -private TreeMap prepareAclCache() { +private void prepareAclCache() throws UnknownHostException { Map> aclEntries = new HashMap<>(); for (int resourceId = 0; resourceId < resourceCount; resourceId++) { ResourcePattern resource = new ResourcePattern( (resourceId % 10 == 0) ? ResourceType.GROUP : ResourceType.TOPIC, -resourceNamePrefix + resourceId, +resourceName(resourceNamePrefix), Review comment: For this line, since the benchmark is creating the random resource name in several places, I'm trying to build a unified way to build it. I think the existing benchmark does not have any DENY resource in it. Adding some DENY resources whose percentage is controlled by parameters will be an improvement to the existing benchmark and help us understand the performance better. In addition, I think adding different types of patterns would also help. What do you think? @rajinisivaram This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] d8tltanc commented on a change in pull request #9485: KAKFA-10619: Idempotent producer will get authorized once it has a WRITE access to at least one topic
d8tltanc commented on a change in pull request #9485: URL: https://github.com/apache/kafka/pull/9485#discussion_r541206648 ## File path: jmh-benchmarks/src/main/java/org/apache/kafka/jmh/acl/AclAuthorizerBenchmark.java ## @@ -69,33 +73,44 @@ @BenchmarkMode(Mode.AverageTime) @OutputTimeUnit(TimeUnit.MILLISECONDS) public class AclAuthorizerBenchmark { -@Param({"1", "5", "20"}) +@Param({"20"}) Review comment: Yes This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] d8tltanc commented on a change in pull request #9485: KAKFA-10619: Idempotent producer will get authorized once it has a WRITE access to at least one topic
d8tltanc commented on a change in pull request #9485: URL: https://github.com/apache/kafka/pull/9485#discussion_r541206048 ## File path: jmh-benchmarks/src/main/java/org/apache/kafka/jmh/acl/AclAuthorizerBenchmark.java ## @@ -69,33 +73,44 @@ @BenchmarkMode(Mode.AverageTime) @OutputTimeUnit(TimeUnit.MILLISECONDS) public class AclAuthorizerBenchmark { -@Param({"1", "5", "20"}) +@Param({"20"}) private int resourceCount; //no. of. rules per resource -@Param({"10", "50"}) +@Param({"50"}) Review comment: Yes. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] d8tltanc commented on a change in pull request #9485: KAKFA-10619: Idempotent producer will get authorized once it has a WRITE access to at least one topic
d8tltanc commented on a change in pull request #9485: URL: https://github.com/apache/kafka/pull/9485#discussion_r541202572 ## File path: jmh-benchmarks/src/main/java/org/apache/kafka/jmh/acl/AclAuthorizerBenchmark.java ## @@ -69,33 +73,44 @@ @BenchmarkMode(Mode.AverageTime) @OutputTimeUnit(TimeUnit.MILLISECONDS) public class AclAuthorizerBenchmark { -@Param({"1", "5", "20"}) +@Param({"20"}) private int resourceCount; //no. of. rules per resource -@Param({"10", "50"}) +@Param({"50"}) private int aclCount; +@Param({"0", "20", "50", "90", "99", "99.9", "99.99", "100"}) +private double denyPercentage; + private final int hostPreCount = 1000; private final String resourceNamePrefix = "foo-bar35_resource-"; +private final String resourceName = resourceNamePrefix + 95; private final AclAuthorizer aclAuthorizer = new AclAuthorizer(); private final KafkaPrincipal principal = new KafkaPrincipal(KafkaPrincipal.USER_TYPE, "test-user"); private List actions = new ArrayList<>(); private RequestContext context; +private TreeMap aclCache = new TreeMap<>(new AclAuthorizer.ResourceOrdering()); +private scala.collection.mutable.HashMap> resourceCache = +new scala.collection.mutable.HashMap<>(); +Random rand = new Random(System.currentTimeMillis()); +double eps = 1e-9; + @Setup(Level.Trial) public void setup() throws Exception { -setFieldValue(aclAuthorizer, AclAuthorizer.class.getDeclaredField("aclCache").getName(), -prepareAclCache()); +prepareAclCache(); +setFieldValue(aclAuthorizer, AclAuthorizer.class.getDeclaredField("aclCache").getName(), aclCache); +setFieldValue(aclAuthorizer, AclAuthorizer.class.getDeclaredField("resourceCache").getName(), resourceCache); // By adding `-95` to the resource name prefix, we cause the `TreeMap.from/to` call to return // most map entries. In such cases, we rely on the filtering based on `String.startsWith` // to return the matching ACLs. Using a more efficient data structure (e.g. a prefix // tree) should improve performance significantly). actions = Collections.singletonList(new Action(AclOperation.WRITE, -new ResourcePattern(ResourceType.TOPIC, resourceNamePrefix + 95, PatternType.LITERAL), +new ResourcePattern(ResourceType.TOPIC, resourceName, PatternType.LITERAL), Review comment: Yeah. I was doing resourceName = resourceName + 95 to re-use this variable. We can revert it. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] d8tltanc commented on a change in pull request #9485: KAKFA-10619: Idempotent producer will get authorized once it has a WRITE access to at least one topic
d8tltanc commented on a change in pull request #9485: URL: https://github.com/apache/kafka/pull/9485#discussion_r540706117 ## File path: core/src/test/scala/unit/kafka/security/authorizer/AclAuthorizerTest.scala ## @@ -1040,19 +1117,24 @@ class AclAuthorizerTest extends ZooKeeperTestHarness { securityProtocol, ClientInformation.EMPTY, false) } - private def authorize(authorizer: AclAuthorizer, requestContext: RequestContext, operation: AclOperation, resource: ResourcePattern): Boolean = { + private def authorize(authorizer: Authorizer, requestContext: RequestContext, operation: AclOperation, resource: ResourcePattern): Boolean = { val action = new Action(operation, resource, 1, true, true) authorizer.authorize(requestContext, List(action).asJava).asScala.head == AuthorizationResult.ALLOWED } - private def addAcls(authorizer: AclAuthorizer, aces: Set[AccessControlEntry], resourcePattern: ResourcePattern): Unit = { + private def authorizeByResourceType(authorizer: Authorizer, requestContext: RequestContext, operation: AclOperation, resourceType: ResourceType) : Boolean = { +authorizer.authorizeByResourceType(requestContext, operation, resourceType) == AuthorizationResult.ALLOWED + } + + private def addAcls(authorizer: Authorizer, aces: Set[AccessControlEntry], resourcePattern: ResourcePattern): Unit = { val bindings = aces.map { ace => new AclBinding(resourcePattern, ace) } authorizer.createAcls(requestContext, bindings.toList.asJava).asScala .map(_.toCompletableFuture.get) .foreach { result => result.exception.ifPresent { e => throw e } } +aclAdded += Tuple3(authorizer, aces, resourcePattern) } - private def removeAcls(authorizer: AclAuthorizer, aces: Set[AccessControlEntry], resourcePattern: ResourcePattern): Boolean = { + private def removeAcls(authorizer: Authorizer, aces: Set[AccessControlEntry], resourcePattern: ResourcePattern): Boolean = { Review comment: Since `AuthorizerInterfaceDefaultTest`, `AclAuthorizerTest`, and `AuthorizerWrapperTest` are sharing some test utils, we need to make this method signature abstract a bit, in order to make it usable by AuthorizerTestFactory. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] d8tltanc commented on a change in pull request #9485: KAKFA-10619: Idempotent producer will get authorized once it has a WRITE access to at least one topic
d8tltanc commented on a change in pull request #9485: URL: https://github.com/apache/kafka/pull/9485#discussion_r540705023 ## File path: core/src/test/scala/unit/kafka/security/authorizer/AclAuthorizerTest.scala ## @@ -100,8 +106,15 @@ class AclAuthorizerTest extends ZooKeeperTestHarness { @After override def tearDown(): Unit = { -aclAuthorizer.close() -aclAuthorizer2.close() +val authorizers = Seq(aclAuthorizer, aclAuthorizer2) +authorizers.foreach(a => { + a.acls(AclBindingFilter.ANY).forEach(bd => { +removeAcls(aclAuthorizer, Set(bd.entry), bd.pattern()) Review comment: Otherwise "deny all" will remain in ZK during the whole test process since ZK won't be restarted or re-instantiated. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] d8tltanc commented on a change in pull request #9485: KAKFA-10619: Idempotent producer will get authorized once it has a WRITE access to at least one topic
d8tltanc commented on a change in pull request #9485: URL: https://github.com/apache/kafka/pull/9485#discussion_r540658692 ## File path: core/src/main/scala/kafka/security/authorizer/AuthorizerWrapper.scala ## @@ -175,4 +181,32 @@ class AuthorizerWrapper(private[kafka] val baseAuthorizer: kafka.security.auth.A override def close(): Unit = { baseAuthorizer.close() } + + override def authorizeByResourceType(requestContext: AuthorizableRequestContext, + op: AclOperation, + resourceType: ResourceType): AuthorizationResult = { +SecurityUtils.authorizeByResourceTypeCheckArgs(op, resourceType) + +if (denyAllResource(requestContext, op, resourceType)) { + AuthorizationResult.DENIED +} else if (shouldAllowEveryoneIfNoAclIsFound) { + AuthorizationResult.ALLOWED +} else { + super.authorizeByResourceType(requestContext, op, resourceType) +} + } + + private def denyAllResource(requestContext: AuthorizableRequestContext, + op: AclOperation, + resourceType: ResourceType): Boolean = { Review comment: commit 8263bd3 changed the AuthorizerWrapper logic and optimized the performance a bit. Now AuthorizerWrapper#denyAllResource will 1. only use Authorizer#acls() to filter out the `WildcardResource` with the pattern type `LITERAL`. 2. check if any of the filtered out bindings match the `request principle` and `request host`. So it's behavior diverges more from the interface default now. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] d8tltanc commented on a change in pull request #9485: KAKFA-10619: Idempotent producer will get authorized once it has a WRITE access to at least one topic
d8tltanc commented on a change in pull request #9485: URL: https://github.com/apache/kafka/pull/9485#discussion_r540655651 ## File path: core/src/main/scala/kafka/security/authorizer/AuthorizerWrapper.scala ## @@ -175,4 +181,32 @@ class AuthorizerWrapper(private[kafka] val baseAuthorizer: kafka.security.auth.A override def close(): Unit = { baseAuthorizer.close() } + + override def authorizeByResourceType(requestContext: AuthorizableRequestContext, + op: AclOperation, + resourceType: ResourceType): AuthorizationResult = { +SecurityUtils.authorizeByResourceTypeCheckArgs(op, resourceType) + +if (denyAllResource(requestContext, op, resourceType)) { + AuthorizationResult.DENIED +} else if (shouldAllowEveryoneIfNoAclIsFound) { + AuthorizationResult.ALLOWED +} else { + super.authorizeByResourceType(requestContext, op, resourceType) +} + } + + private def denyAllResource(requestContext: AuthorizableRequestContext, + op: AclOperation, + resourceType: ResourceType): Boolean = { +val resourceTypeFilter = new ResourcePatternFilter( + resourceType, null, PatternType.ANY) +val principal = new KafkaPrincipal(requestContext.principal.getPrincipalType, requestContext.principal.getName) +val accessControlEntry = new AccessControlEntryFilter( + principal.toString, requestContext.clientAddress().getHostAddress, op, AclPermissionType.DENY) Review comment: Good catch. commit 8263bd319f63d39808f90129db55427b98385dd4 Since it's a bit hard to test `AllowAnyoneIfNoAclFound` and many other logics in the integration test, I added a new test class `AuthorizerWrapperTest`. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] d8tltanc commented on a change in pull request #9485: KAKFA-10619: Idempotent producer will get authorized once it has a WRITE access to at least one topic
d8tltanc commented on a change in pull request #9485: URL: https://github.com/apache/kafka/pull/9485#discussion_r540655651 ## File path: core/src/main/scala/kafka/security/authorizer/AuthorizerWrapper.scala ## @@ -175,4 +181,32 @@ class AuthorizerWrapper(private[kafka] val baseAuthorizer: kafka.security.auth.A override def close(): Unit = { baseAuthorizer.close() } + + override def authorizeByResourceType(requestContext: AuthorizableRequestContext, + op: AclOperation, + resourceType: ResourceType): AuthorizationResult = { +SecurityUtils.authorizeByResourceTypeCheckArgs(op, resourceType) + +if (denyAllResource(requestContext, op, resourceType)) { + AuthorizationResult.DENIED +} else if (shouldAllowEveryoneIfNoAclIsFound) { + AuthorizationResult.ALLOWED +} else { + super.authorizeByResourceType(requestContext, op, resourceType) +} + } + + private def denyAllResource(requestContext: AuthorizableRequestContext, + op: AclOperation, + resourceType: ResourceType): Boolean = { +val resourceTypeFilter = new ResourcePatternFilter( + resourceType, null, PatternType.ANY) +val principal = new KafkaPrincipal(requestContext.principal.getPrincipalType, requestContext.principal.getName) +val accessControlEntry = new AccessControlEntryFilter( + principal.toString, requestContext.clientAddress().getHostAddress, op, AclPermissionType.DENY) Review comment: Good catch. commit 83d1f8840933a28934f261ca6affc820a8ececd5 Since it's a bit hard to test `AllowAnyoneIfNoAclFound` and many other logics in the integration test, I added a new test class `AuthorizerWrapperTest`. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] d8tltanc commented on a change in pull request #9485: KAKFA-10619: Idempotent producer will get authorized once it has a WRITE access to at least one topic
d8tltanc commented on a change in pull request #9485: URL: https://github.com/apache/kafka/pull/9485#discussion_r540418386 ## File path: core/src/main/scala/kafka/security/authorizer/AuthorizerWrapper.scala ## @@ -71,15 +73,19 @@ object AuthorizerWrapper { } def convertToResource(resourcePattern: ResourcePattern): Resource = { -Resource(ResourceType.fromJava(resourcePattern.resourceType), resourcePattern.name, resourcePattern.patternType) +Resource(ResourceTypeLegacy.fromJava(resourcePattern.resourceType), resourcePattern.name, resourcePattern.patternType) } } @deprecated("Use kafka.security.authorizer.AclAuthorizer", "Since 2.5") class AuthorizerWrapper(private[kafka] val baseAuthorizer: kafka.security.auth.Authorizer) extends Authorizer { + var shouldAllowEveryoneIfNoAclIsFound = false + override def configure(configs: util.Map[String, _]): Unit = { baseAuthorizer.configure(configs) +shouldAllowEveryoneIfNoAclIsFound = configs.asScala.get( + AclAuthorizer.AllowEveryoneIfNoAclIsFoundProp).exists(_.toString.toBoolean) Review comment: Given that we probably don't want to change the deprecated Authorizer interface, I can only think of one way to achieve this: Besides checking if the `AllowEveryoneIfNoAclIsFoundProp` exists and if it equals to `true`, I added another check to authorize on a hardcoded session, operation, and resource. Since configure() will be called immediately after the authorizer instantiation, it's guaranteed that no ACLs would exist when we do this check. override def configure(configs: util.Map[String, _]): Unit = { baseAuthorizer.configure(configs) shouldAllowEveryoneIfNoAclIsFound = (configs.asScala.get( AclAuthorizer.AllowEveryoneIfNoAclIsFoundProp).exists(_.toString.toBoolean) && baseAuthorizer.authorize( new Session(KafkaPrincipal.ANONYMOUS, InetAddress.getByName("1.2.3.4")), Read, new Resource(Topic, "hi", PatternType.LITERAL))) } commit 2ed79a0a7788f8841475badfd1c26adf0eb3435c This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] d8tltanc commented on a change in pull request #9485: KAKFA-10619: Idempotent producer will get authorized once it has a WRITE access to at least one topic
d8tltanc commented on a change in pull request #9485: URL: https://github.com/apache/kafka/pull/9485#discussion_r539787542 ## File path: core/src/main/scala/kafka/security/authorizer/AclAuthorizer.scala ## @@ -304,6 +308,105 @@ class AclAuthorizer extends Authorizer with Logging { if (zkClient != null) zkClient.close() } + // TODO: 1. Discuss how to log audit message + // TODO: 2. Discuss if we need a trie to optimize(mainly for the O(n^2) loop but I think + // in most of the cases it would be O(1) because denyDominatePrefixAllow should be rare + override def authorizeByResourceType(requestContext: AuthorizableRequestContext, + op: AclOperation, + resourceType: ResourceType): AuthorizationResult = { +SecurityUtils.authorizeByResourceTypeCheckArgs(op, resourceType) + +val principal = new KafkaPrincipal( + requestContext.principal().getPrincipalType, + requestContext.principal().getName).toString +val host = requestContext.clientAddress().getHostAddress +val action = new Action(op, new ResourcePattern(resourceType, "NONE", PatternType.UNKNOWN), 0, true, true) + +val denyLiterals = matchingResources( + principal, host, op, AclPermissionType.DENY, resourceType, PatternType.LITERAL) + +if (denyAll(denyLiterals)) { + logAuditMessage(requestContext, action, false) + return AuthorizationResult.DENIED +} + +if (shouldAllowEveryoneIfNoAclIsFound) { + logAuditMessage(requestContext, action, true) + return AuthorizationResult.ALLOWED +} + +val allowLiterals = matchingResources( + principal, host, op, AclPermissionType.ALLOW, resourceType, PatternType.LITERAL) +val allowPrefixes = matchingResources( + principal, host, op, AclPermissionType.ALLOW, resourceType, PatternType.PREFIXED) +val denyPrefixes = matchingResources( + principal, host, op, AclPermissionType.DENY, resourceType, PatternType.PREFIXED) + +if (allowAny(allowLiterals, allowPrefixes, denyLiterals, denyPrefixes)) { + logAuditMessage(requestContext, action, true) + return AuthorizationResult.ALLOWED +} + +logAuditMessage(requestContext, action, false) +AuthorizationResult.DENIED + } + + def matchingResources(principal: String, host: String, op: AclOperation, permission: AclPermissionType, +resourceType: ResourceType, patternType: PatternType): List[mutable.HashSet[String]] = { +var matched = List[mutable.HashSet[String]]() +for (p <- Set(principal, AclEntry.WildcardPrincipal.toString)) { + for (h <- Set(host, AclEntry.WildcardHost)) { +for (o <- Set(op, AclOperation.ALL)) { + val ace = new AccessControlEntry(p, h, o, permission) + val resourceIndex = new ResourceIndex(ace, resourceType, patternType) + resourceCache.get(resourceIndex) match { +case Some(resources) => matched = matched :+ resources +case None => + } +} + } +} +matched + } + + def denyAll(denyLiterals: List[mutable.HashSet[String]]): Boolean = +denyLiterals.exists(r => r.contains(ResourcePattern.WILDCARD_RESOURCE)) + + + private def allowAny(allowLiterals: List[mutable.Set[String]], allowPrefixes: List[mutable.Set[String]], + denyLiterals: List[mutable.Set[String]], denyPrefixes: List[mutable.Set[String]]): Boolean = { +(allowPrefixes.exists(prefixes => + prefixes.exists(prefix => allowPrefix(prefix, denyPrefixes))) + || allowLiterals.exists(literals => +literals.exists(literal => allowLiteral(literal, denyLiterals, denyPrefixes + } + + private def allowLiteral(literalName: String, + denyLiterals: List[mutable.Set[String]], denyPrefixes: List[mutable.Set[String]]): Boolean = { +literalName match{ + case ResourcePattern.WILDCARD_RESOURCE => true + case _ => (denyLiterals.forall(denyLiterals => !denyLiterals.contains(literalName)) +&& !hasDominantPrefixedDeny(literalName, denyPrefixes)) +} + } + + private def allowPrefix(prefixName: String, + denyPrefixes: List[mutable.Set[String]]): Boolean = { +!hasDominantPrefixedDeny(prefixName, denyPrefixes) + } + + private def hasDominantPrefixedDeny(resourceName: String, denyPrefixes: List[mutable.Set[String]]): Boolean = { Review comment: I was trying to share it but it seems like the different collection type btw java and scala is a headache. We'll then need some java converters or instantiate a java collection in the scala code. Do you think we do this? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this
[GitHub] [kafka] d8tltanc commented on a change in pull request #9485: KAKFA-10619: Idempotent producer will get authorized once it has a WRITE access to at least one topic
d8tltanc commented on a change in pull request #9485: URL: https://github.com/apache/kafka/pull/9485#discussion_r539782093 ## File path: core/src/main/scala/kafka/security/authorizer/AclAuthorizer.scala ## @@ -304,6 +308,105 @@ class AclAuthorizer extends Authorizer with Logging { if (zkClient != null) zkClient.close() } + // TODO: 1. Discuss how to log audit message + // TODO: 2. Discuss if we need a trie to optimize(mainly for the O(n^2) loop but I think + // in most of the cases it would be O(1) because denyDominatePrefixAllow should be rare + override def authorizeByResourceType(requestContext: AuthorizableRequestContext, + op: AclOperation, + resourceType: ResourceType): AuthorizationResult = { +SecurityUtils.authorizeByResourceTypeCheckArgs(op, resourceType) + +val principal = new KafkaPrincipal( + requestContext.principal().getPrincipalType, + requestContext.principal().getName).toString +val host = requestContext.clientAddress().getHostAddress +val action = new Action(op, new ResourcePattern(resourceType, "NONE", PatternType.UNKNOWN), 0, true, true) + +val denyLiterals = matchingResources( + principal, host, op, AclPermissionType.DENY, resourceType, PatternType.LITERAL) + +if (denyAll(denyLiterals)) { + logAuditMessage(requestContext, action, false) + return AuthorizationResult.DENIED +} + +if (shouldAllowEveryoneIfNoAclIsFound) { + logAuditMessage(requestContext, action, true) + return AuthorizationResult.ALLOWED +} + +val allowLiterals = matchingResources( + principal, host, op, AclPermissionType.ALLOW, resourceType, PatternType.LITERAL) +val allowPrefixes = matchingResources( + principal, host, op, AclPermissionType.ALLOW, resourceType, PatternType.PREFIXED) +val denyPrefixes = matchingResources( + principal, host, op, AclPermissionType.DENY, resourceType, PatternType.PREFIXED) + +if (allowAny(allowLiterals, allowPrefixes, denyLiterals, denyPrefixes)) { + logAuditMessage(requestContext, action, true) + return AuthorizationResult.ALLOWED +} + +logAuditMessage(requestContext, action, false) +AuthorizationResult.DENIED + } + + def matchingResources(principal: String, host: String, op: AclOperation, permission: AclPermissionType, +resourceType: ResourceType, patternType: PatternType): List[mutable.HashSet[String]] = { Review comment: I was thinking that we'd want to prevent constructing extremely large hash sets in order to prevent the long wait for resizing. What do you think? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] d8tltanc commented on a change in pull request #9485: KAKFA-10619: Idempotent producer will get authorized once it has a WRITE access to at least one topic
d8tltanc commented on a change in pull request #9485: URL: https://github.com/apache/kafka/pull/9485#discussion_r539751258 ## File path: core/src/main/scala/kafka/security/authorizer/AclAuthorizer.scala ## @@ -304,6 +308,105 @@ class AclAuthorizer extends Authorizer with Logging { if (zkClient != null) zkClient.close() } + // TODO: 1. Discuss how to log audit message + // TODO: 2. Discuss if we need a trie to optimize(mainly for the O(n^2) loop but I think + // in most of the cases it would be O(1) because denyDominatePrefixAllow should be rare + override def authorizeByResourceType(requestContext: AuthorizableRequestContext, + op: AclOperation, + resourceType: ResourceType): AuthorizationResult = { +SecurityUtils.authorizeByResourceTypeCheckArgs(op, resourceType) + +val principal = new KafkaPrincipal( + requestContext.principal().getPrincipalType, + requestContext.principal().getName).toString +val host = requestContext.clientAddress().getHostAddress +val action = new Action(op, new ResourcePattern(resourceType, "NONE", PatternType.UNKNOWN), 0, true, true) + +val denyLiterals = matchingResources( + principal, host, op, AclPermissionType.DENY, resourceType, PatternType.LITERAL) + +if (denyAll(denyLiterals)) { + logAuditMessage(requestContext, action, false) + return AuthorizationResult.DENIED +} + +if (shouldAllowEveryoneIfNoAclIsFound) { + logAuditMessage(requestContext, action, true) + return AuthorizationResult.ALLOWED +} + +val allowLiterals = matchingResources( + principal, host, op, AclPermissionType.ALLOW, resourceType, PatternType.LITERAL) +val allowPrefixes = matchingResources( + principal, host, op, AclPermissionType.ALLOW, resourceType, PatternType.PREFIXED) +val denyPrefixes = matchingResources( + principal, host, op, AclPermissionType.DENY, resourceType, PatternType.PREFIXED) Review comment: Yes. commit 1dc143fc78a3b9927189751255346ef0b6cafd90 if (noDeny) { ..if (hasAllow) { return Authorize.ALLOWED ..} else { return Authorize.DENIED // since no allow exists ..} } This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] d8tltanc commented on a change in pull request #9485: KAKFA-10619: Idempotent producer will get authorized once it has a WRITE access to at least one topic
d8tltanc commented on a change in pull request #9485: URL: https://github.com/apache/kafka/pull/9485#discussion_r539751258 ## File path: core/src/main/scala/kafka/security/authorizer/AclAuthorizer.scala ## @@ -304,6 +308,105 @@ class AclAuthorizer extends Authorizer with Logging { if (zkClient != null) zkClient.close() } + // TODO: 1. Discuss how to log audit message + // TODO: 2. Discuss if we need a trie to optimize(mainly for the O(n^2) loop but I think + // in most of the cases it would be O(1) because denyDominatePrefixAllow should be rare + override def authorizeByResourceType(requestContext: AuthorizableRequestContext, + op: AclOperation, + resourceType: ResourceType): AuthorizationResult = { +SecurityUtils.authorizeByResourceTypeCheckArgs(op, resourceType) + +val principal = new KafkaPrincipal( + requestContext.principal().getPrincipalType, + requestContext.principal().getName).toString +val host = requestContext.clientAddress().getHostAddress +val action = new Action(op, new ResourcePattern(resourceType, "NONE", PatternType.UNKNOWN), 0, true, true) + +val denyLiterals = matchingResources( + principal, host, op, AclPermissionType.DENY, resourceType, PatternType.LITERAL) + +if (denyAll(denyLiterals)) { + logAuditMessage(requestContext, action, false) + return AuthorizationResult.DENIED +} + +if (shouldAllowEveryoneIfNoAclIsFound) { + logAuditMessage(requestContext, action, true) + return AuthorizationResult.ALLOWED +} + +val allowLiterals = matchingResources( + principal, host, op, AclPermissionType.ALLOW, resourceType, PatternType.LITERAL) +val allowPrefixes = matchingResources( + principal, host, op, AclPermissionType.ALLOW, resourceType, PatternType.PREFIXED) +val denyPrefixes = matchingResources( + principal, host, op, AclPermissionType.DENY, resourceType, PatternType.PREFIXED) Review comment: Yes. commit 1dc143fc78a3b9927189751255346ef0b6cafd90 if (noDeny) { if (hasAllow) { return Authorize.ALLOWED } else { return Authorize.DENIED // since no allow exists } } This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] d8tltanc commented on a change in pull request #9485: KAKFA-10619: Idempotent producer will get authorized once it has a WRITE access to at least one topic
d8tltanc commented on a change in pull request #9485: URL: https://github.com/apache/kafka/pull/9485#discussion_r539737287 ## File path: jmh-benchmarks/src/main/java/org/apache/kafka/jmh/acl/AclAuthorizerBenchmark.java ## @@ -164,4 +206,28 @@ public void testAclsIterator() { public void testAuthorizer() { aclAuthorizer.authorize(context, actions); } + +@Benchmark +public void testAuthorizeByResourceType() { +aclAuthorizer.authorizeByResourceType(context, AclOperation.WRITE, ResourceType.TOPIC); +} + +@Benchmark +public void testUpdateCache() { +AclAuthorizer aclAuthorizer = new AclAuthorizer(); +scala.collection.mutable.Set entries = new scala.collection.mutable.HashSet<>(); +for (int i = 0; i < resourceCount; i ++){ +scala.collection.immutable.Set immutable = new scala.collection.immutable.HashSet<>(); +for (int j = 0; j < aclCount; j++) { +entries.add(new AclEntry(new AccessControlEntry( +principal.toString(), "127.0.0" + j, AclOperation.WRITE, AclPermissionType.ALLOW))); +immutable = entries.toSet(); +} +aclAuthorizer.updateCache( Review comment: Sure This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] d8tltanc commented on a change in pull request #9485: KAKFA-10619: Idempotent producer will get authorized once it has a WRITE access to at least one topic
d8tltanc commented on a change in pull request #9485: URL: https://github.com/apache/kafka/pull/9485#discussion_r539737110 ## File path: core/src/main/scala/kafka/security/authorizer/AclAuthorizer.scala ## @@ -304,6 +308,105 @@ class AclAuthorizer extends Authorizer with Logging { if (zkClient != null) zkClient.close() } + // TODO: 1. Discuss how to log audit message + // TODO: 2. Discuss if we need a trie to optimize(mainly for the O(n^2) loop but I think + // in most of the cases it would be O(1) because denyDominatePrefixAllow should be rare + override def authorizeByResourceType(requestContext: AuthorizableRequestContext, + op: AclOperation, + resourceType: ResourceType): AuthorizationResult = { +SecurityUtils.authorizeByResourceTypeCheckArgs(op, resourceType) + +val principal = new KafkaPrincipal( + requestContext.principal().getPrincipalType, + requestContext.principal().getName).toString +val host = requestContext.clientAddress().getHostAddress +val action = new Action(op, new ResourcePattern(resourceType, "NONE", PatternType.UNKNOWN), 0, true, true) + +val denyLiterals = matchingResources( + principal, host, op, AclPermissionType.DENY, resourceType, PatternType.LITERAL) + +if (denyAll(denyLiterals)) { + logAuditMessage(requestContext, action, false) + return AuthorizationResult.DENIED +} + +if (shouldAllowEveryoneIfNoAclIsFound) { + logAuditMessage(requestContext, action, true) + return AuthorizationResult.ALLOWED +} + +val allowLiterals = matchingResources( + principal, host, op, AclPermissionType.ALLOW, resourceType, PatternType.LITERAL) +val allowPrefixes = matchingResources( + principal, host, op, AclPermissionType.ALLOW, resourceType, PatternType.PREFIXED) +val denyPrefixes = matchingResources( + principal, host, op, AclPermissionType.DENY, resourceType, PatternType.PREFIXED) + +if (allowAny(allowLiterals, allowPrefixes, denyLiterals, denyPrefixes)) { + logAuditMessage(requestContext, action, true) + return AuthorizationResult.ALLOWED +} + +logAuditMessage(requestContext, action, false) +AuthorizationResult.DENIED + } + + def matchingResources(principal: String, host: String, op: AclOperation, permission: AclPermissionType, +resourceType: ResourceType, patternType: PatternType): List[mutable.HashSet[String]] = { +var matched = List[mutable.HashSet[String]]() +for (p <- Set(principal, AclEntry.WildcardPrincipal.toString)) { Review comment: commit 2fd4babe2c27ee0723fa1cd720ca35d2bbefe57b This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] d8tltanc commented on a change in pull request #9485: KAKFA-10619: Idempotent producer will get authorized once it has a WRITE access to at least one topic
d8tltanc commented on a change in pull request #9485: URL: https://github.com/apache/kafka/pull/9485#discussion_r539736828 ## File path: core/src/main/scala/kafka/security/authorizer/AclAuthorizer.scala ## @@ -304,6 +308,105 @@ class AclAuthorizer extends Authorizer with Logging { if (zkClient != null) zkClient.close() } + // TODO: 1. Discuss how to log audit message + // TODO: 2. Discuss if we need a trie to optimize(mainly for the O(n^2) loop but I think + // in most of the cases it would be O(1) because denyDominatePrefixAllow should be rare + override def authorizeByResourceType(requestContext: AuthorizableRequestContext, + op: AclOperation, + resourceType: ResourceType): AuthorizationResult = { +SecurityUtils.authorizeByResourceTypeCheckArgs(op, resourceType) + +val principal = new KafkaPrincipal( + requestContext.principal().getPrincipalType, + requestContext.principal().getName).toString +val host = requestContext.clientAddress().getHostAddress +val action = new Action(op, new ResourcePattern(resourceType, "NONE", PatternType.UNKNOWN), 0, true, true) + +val denyLiterals = matchingResources( + principal, host, op, AclPermissionType.DENY, resourceType, PatternType.LITERAL) + +if (denyAll(denyLiterals)) { + logAuditMessage(requestContext, action, false) + return AuthorizationResult.DENIED +} + +if (shouldAllowEveryoneIfNoAclIsFound) { + logAuditMessage(requestContext, action, true) + return AuthorizationResult.ALLOWED +} + +val allowLiterals = matchingResources( + principal, host, op, AclPermissionType.ALLOW, resourceType, PatternType.LITERAL) +val allowPrefixes = matchingResources( + principal, host, op, AclPermissionType.ALLOW, resourceType, PatternType.PREFIXED) +val denyPrefixes = matchingResources( + principal, host, op, AclPermissionType.DENY, resourceType, PatternType.PREFIXED) + +if (allowAny(allowLiterals, allowPrefixes, denyLiterals, denyPrefixes)) { + logAuditMessage(requestContext, action, true) + return AuthorizationResult.ALLOWED +} + +logAuditMessage(requestContext, action, false) +AuthorizationResult.DENIED + } + + def matchingResources(principal: String, host: String, op: AclOperation, permission: AclPermissionType, +resourceType: ResourceType, patternType: PatternType): List[mutable.HashSet[String]] = { +var matched = List[mutable.HashSet[String]]() +for (p <- Set(principal, AclEntry.WildcardPrincipal.toString)) { + for (h <- Set(host, AclEntry.WildcardHost)) { +for (o <- Set(op, AclOperation.ALL)) { + val ace = new AccessControlEntry(p, h, o, permission) + val resourceIndex = new ResourceIndex(ace, resourceType, patternType) + resourceCache.get(resourceIndex) match { +case Some(resources) => matched = matched :+ resources +case None => + } +} + } +} +matched + } + + def denyAll(denyLiterals: List[mutable.HashSet[String]]): Boolean = Review comment: Yes. commit 2fd4babe2c27ee0723fa1cd720ca35d2bbefe57b This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] d8tltanc commented on a change in pull request #9485: KAKFA-10619: Idempotent producer will get authorized once it has a WRITE access to at least one topic
d8tltanc commented on a change in pull request #9485: URL: https://github.com/apache/kafka/pull/9485#discussion_r539734983 ## File path: core/src/main/scala/kafka/security/authorizer/AclAuthorizer.scala ## @@ -304,6 +308,105 @@ class AclAuthorizer extends Authorizer with Logging { if (zkClient != null) zkClient.close() } + // TODO: 1. Discuss how to log audit message + // TODO: 2. Discuss if we need a trie to optimize(mainly for the O(n^2) loop but I think + // in most of the cases it would be O(1) because denyDominatePrefixAllow should be rare Review comment: Yeah. Removed. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] d8tltanc commented on a change in pull request #9485: KAKFA-10619: Idempotent producer will get authorized once it has a WRITE access to at least one topic
d8tltanc commented on a change in pull request #9485: URL: https://github.com/apache/kafka/pull/9485#discussion_r539734315 ## File path: clients/src/main/java/org/apache/kafka/common/acl/ResourceIndex.java ## @@ -0,0 +1,61 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.common.acl; + +import org.apache.kafka.common.resource.PatternType; +import org.apache.kafka.common.resource.ResourceType; + +import java.util.Objects; + +public class ResourceIndex { Review comment: I used index as it's used as the index of the hashmap. What about something like `ResourceNameFilter`? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org