[GitHub] [kafka] d8tltanc commented on a change in pull request #9485: KAKFA-10619: Idempotent producer will get authorized once it has a WRITE access to at least one topic

2020-12-17 Thread GitBox


d8tltanc commented on a change in pull request #9485:
URL: https://github.com/apache/kafka/pull/9485#discussion_r545585355



##
File path: 
core/src/test/scala/unit/kafka/security/authorizer/AuthorizerInterfaceDefaultTest.scala
##
@@ -0,0 +1,99 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package kafka.security.authorizer
+
+import java.util.concurrent.CompletionStage
+import java.{lang, util}
+
+import kafka.server.KafkaConfig
+import kafka.utils.TestUtils
+import kafka.zk.ZooKeeperTestHarness
+import kafka.zookeeper.ZooKeeperClient
+import org.apache.kafka.common.Endpoint
+import org.apache.kafka.common.acl._
+import org.apache.kafka.common.utils.Time
+import org.apache.kafka.server.authorizer._
+import org.junit.{After, Before}
+
+class AuthorizerInterfaceDefaultTest extends ZooKeeperTestHarness with 
BaseAuthorizerTest {
+
+  private val interfaceDefaultAuthorizer = new DelegateAuthorizer
+
+  override def authorizer: Authorizer = interfaceDefaultAuthorizer
+
+  @Before
+  override def setUp(): Unit = {
+super.setUp()
+
+val authorizers = Seq(interfaceDefaultAuthorizer.authorizer)

Review comment:
   Yes. Remove the Seq construction and make a single class member call.
   commit b6a766b





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] d8tltanc commented on a change in pull request #9485: KAKFA-10619: Idempotent producer will get authorized once it has a WRITE access to at least one topic

2020-12-17 Thread GitBox


d8tltanc commented on a change in pull request #9485:
URL: https://github.com/apache/kafka/pull/9485#discussion_r545585037



##
File path: 
core/src/test/scala/unit/kafka/security/authorizer/AclAuthorizerTest.scala
##
@@ -988,6 +980,30 @@ class AclAuthorizerTest extends ZooKeeperTestHarness {
 }
   }
 
+  @Test
+  def testAuthorizeByResourceTypeNoAclFoundOverride(): Unit = {
+testAuthorizeByResourceTypeNoAclFoundOverride(aclAuthorizer)
+  }
+
+  private def testAuthorizeByResourceTypeNoAclFoundOverride(authorizer: 
Authorizer): Unit = {
+val props = TestUtils.createBrokerConfig(1, zkConnect)
+props.put(AclAuthorizer.AllowEveryoneIfNoAclIsFoundProp, "true")
+
+val cfg = KafkaConfig.fromProps(props)
+val testAuthorizer = new AclAuthorizer

Review comment:
   Yes. Renamed to aclAuthorizer. 
   commit b6a766b





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] d8tltanc commented on a change in pull request #9485: KAKFA-10619: Idempotent producer will get authorized once it has a WRITE access to at least one topic

2020-12-17 Thread GitBox


d8tltanc commented on a change in pull request #9485:
URL: https://github.com/apache/kafka/pull/9485#discussion_r545584783



##
File path: core/src/main/scala/kafka/security/authorizer/AclAuthorizer.scala
##
@@ -304,6 +309,137 @@ class AclAuthorizer extends Authorizer with Logging {
 if (zkClient != null) zkClient.close()
   }
 
+  override def authorizeByResourceType(requestContext: 
AuthorizableRequestContext,
+   op: AclOperation,
+   resourceType: ResourceType): 
AuthorizationResult = {
+SecurityUtils.authorizeByResourceTypeCheckArgs(op, resourceType)
+
+val principal = new KafkaPrincipal(
+  requestContext.principal().getPrincipalType,
+  requestContext.principal().getName)
+
+if (isSuperUser(principal))
+  return AuthorizationResult.ALLOWED
+
+val principalStr = principal.toString
+
+val host = requestContext.clientAddress().getHostAddress
+val action = new Action(op, new ResourcePattern(resourceType, "NONE", 
PatternType.UNKNOWN), 0, true, true)
+
+val denyLiterals = matchingResources(
+  principalStr, host, op, AclPermissionType.DENY, resourceType, 
PatternType.LITERAL)
+
+if (denyAll(denyLiterals)) {
+  logAuditMessage(requestContext, action, authorized = false)
+  return AuthorizationResult.DENIED
+}
+
+if (shouldAllowEveryoneIfNoAclIsFound) {
+  logAuditMessage(requestContext, action, authorized = true)
+  return AuthorizationResult.ALLOWED
+}
+
+val denyPrefixes = matchingResources(
+  principalStr, host, op, AclPermissionType.DENY, resourceType, 
PatternType.PREFIXED)
+
+if (denyLiterals.isEmpty && denyPrefixes.isEmpty) {
+  if (hasMatchingResources(principalStr, host, op, 
AclPermissionType.ALLOW, resourceType, PatternType.PREFIXED)
+  || hasMatchingResources(principalStr, host, op, 
AclPermissionType.ALLOW, resourceType, PatternType.LITERAL)) {
+logAuditMessage(requestContext, action, authorized = true)
+return AuthorizationResult.ALLOWED
+  } else {
+logAuditMessage(requestContext, action, authorized = false)
+return AuthorizationResult.DENIED
+  }
+}
+
+val allowLiterals = matchingResources(
+  principalStr, host, op, AclPermissionType.ALLOW, resourceType, 
PatternType.LITERAL)
+val allowPrefixes = matchingResources(
+  principalStr, host, op, AclPermissionType.ALLOW, resourceType, 
PatternType.PREFIXED)
+
+if (allowAny(allowLiterals, allowPrefixes, denyLiterals, denyPrefixes)) {
+  logAuditMessage(requestContext, action, authorized = true)
+  return AuthorizationResult.ALLOWED
+}
+
+logAuditMessage(requestContext, action, authorized = false)
+AuthorizationResult.DENIED
+  }
+
+  def matchingResources(principal: String, host: String, op: AclOperation, 
permission: AclPermissionType,
+resourceType: ResourceType, patternType: PatternType): 
List[immutable.HashSet[String]] = {
+var matched = List[immutable.HashSet[String]]()
+for (p <- Set(principal, AclEntry.WildcardPrincipalString)) {
+  for (h <- Set(host, AclEntry.WildcardHost)) {
+for (o <- Set(op, AclOperation.ALL)) {
+  val resourceIndex = new ResourceTypeKey(
+new AccessControlEntry(p, h, o, permission), resourceType, 
patternType)
+  resourceCache.get(resourceIndex) match {

Review comment:
   commit b6a766b

##
File path: 
core/src/test/scala/unit/kafka/security/authorizer/AuthorizerInterfaceDefaultTest.scala
##
@@ -0,0 +1,99 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package kafka.security.authorizer
+
+import java.util.concurrent.CompletionStage
+import java.{lang, util}
+
+import kafka.server.KafkaConfig
+import kafka.utils.TestUtils
+import kafka.zk.ZooKeeperTestHarness
+import kafka.zookeeper.ZooKeeperClient
+import org.apache.kafka.common.Endpoint
+import org.apache.kafka.common.acl._
+import org.apache.kafka.common.utils.Time
+import org.apache.kafka.server.authorizer._
+import org.junit.{After, Before}
+
+class AuthorizerInterfaceDefaultTest extends ZooKeeperTestHarness with 
BaseAuthorizerTest {
+

[GitHub] [kafka] d8tltanc commented on a change in pull request #9485: KAKFA-10619: Idempotent producer will get authorized once it has a WRITE access to at least one topic

2020-12-17 Thread GitBox


d8tltanc commented on a change in pull request #9485:
URL: https://github.com/apache/kafka/pull/9485#discussion_r545584468



##
File path: core/src/main/scala/kafka/security/authorizer/AclAuthorizer.scala
##
@@ -304,6 +309,137 @@ class AclAuthorizer extends Authorizer with Logging {
 if (zkClient != null) zkClient.close()
   }
 
+  override def authorizeByResourceType(requestContext: 
AuthorizableRequestContext,
+   op: AclOperation,
+   resourceType: ResourceType): 
AuthorizationResult = {
+SecurityUtils.authorizeByResourceTypeCheckArgs(op, resourceType)
+
+val principal = new KafkaPrincipal(
+  requestContext.principal().getPrincipalType,
+  requestContext.principal().getName)
+
+if (isSuperUser(principal))
+  return AuthorizationResult.ALLOWED
+
+val principalStr = principal.toString
+
+val host = requestContext.clientAddress().getHostAddress
+val action = new Action(op, new ResourcePattern(resourceType, "NONE", 
PatternType.UNKNOWN), 0, true, true)
+
+val denyLiterals = matchingResources(
+  principalStr, host, op, AclPermissionType.DENY, resourceType, 
PatternType.LITERAL)
+
+if (denyAll(denyLiterals)) {
+  logAuditMessage(requestContext, action, authorized = false)
+  return AuthorizationResult.DENIED
+}
+
+if (shouldAllowEveryoneIfNoAclIsFound) {
+  logAuditMessage(requestContext, action, authorized = true)
+  return AuthorizationResult.ALLOWED
+}
+
+val denyPrefixes = matchingResources(
+  principalStr, host, op, AclPermissionType.DENY, resourceType, 
PatternType.PREFIXED)
+
+if (denyLiterals.isEmpty && denyPrefixes.isEmpty) {
+  if (hasMatchingResources(principalStr, host, op, 
AclPermissionType.ALLOW, resourceType, PatternType.PREFIXED)
+  || hasMatchingResources(principalStr, host, op, 
AclPermissionType.ALLOW, resourceType, PatternType.LITERAL)) {
+logAuditMessage(requestContext, action, authorized = true)
+return AuthorizationResult.ALLOWED
+  } else {
+logAuditMessage(requestContext, action, authorized = false)
+return AuthorizationResult.DENIED
+  }
+}
+
+val allowLiterals = matchingResources(
+  principalStr, host, op, AclPermissionType.ALLOW, resourceType, 
PatternType.LITERAL)
+val allowPrefixes = matchingResources(
+  principalStr, host, op, AclPermissionType.ALLOW, resourceType, 
PatternType.PREFIXED)
+
+if (allowAny(allowLiterals, allowPrefixes, denyLiterals, denyPrefixes)) {
+  logAuditMessage(requestContext, action, authorized = true)
+  return AuthorizationResult.ALLOWED
+}
+
+logAuditMessage(requestContext, action, authorized = false)
+AuthorizationResult.DENIED
+  }
+
+  def matchingResources(principal: String, host: String, op: AclOperation, 
permission: AclPermissionType,
+resourceType: ResourceType, patternType: PatternType): 
List[immutable.HashSet[String]] = {
+var matched = List[immutable.HashSet[String]]()
+for (p <- Set(principal, AclEntry.WildcardPrincipalString)) {
+  for (h <- Set(host, AclEntry.WildcardHost)) {
+for (o <- Set(op, AclOperation.ALL)) {
+  val resourceIndex = new ResourceTypeKey(
+new AccessControlEntry(p, h, o, permission), resourceType, 
patternType)
+  resourceCache.get(resourceIndex) match {
+case Some(resources) => matched = matched :+ resources
+case None =>
+  }
+}
+  }
+}
+matched
+  }
+
+  def hasMatchingResources(principal: String, host: String, op: AclOperation, 
permission: AclPermissionType,
+   resourceType: ResourceType, patternType: 
PatternType): Boolean = {
+for (p <- Set(principal, AclEntry.WildcardPrincipalString)) {
+  for (h <- Set(host, AclEntry.WildcardHost)) {
+for (o <- Set(op, AclOperation.ALL)) {
+  val resourceIndex = new ResourceTypeKey(
+new AccessControlEntry(p, h, o, permission), resourceType, 
patternType)
+  resourceCache.get(resourceIndex) match {
+case Some(_) => return true
+case None =>
+  }
+}
+  }
+}
+false
+  }
+
+  private def denyAll(denyLiterals: List[immutable.HashSet[String]]): Boolean =
+denyLiterals.exists(r => r.contains(ResourcePattern.WILDCARD_RESOURCE))
+
+
+  private def allowAny(allowLiterals: List[immutable.Set[String]], 
allowPrefixes: List[immutable.Set[String]],
+   denyLiterals: List[immutable.Set[String]], 
denyPrefixes: List[immutable.Set[String]]): Boolean = {
+(allowPrefixes.exists(prefixes =>
+  prefixes.exists(prefix => allowPrefix(prefix, denyPrefixes)))
+  || allowLiterals.exists(literals =>
+literals.exists(literal => allowLiteral(literal, denyLiterals, 
denyPrefixes
+  }
+
+  private def allowLiteral(literalName: 

[GitHub] [kafka] d8tltanc commented on a change in pull request #9485: KAKFA-10619: Idempotent producer will get authorized once it has a WRITE access to at least one topic

2020-12-17 Thread GitBox


d8tltanc commented on a change in pull request #9485:
URL: https://github.com/apache/kafka/pull/9485#discussion_r545581171



##
File path: core/src/main/scala/kafka/security/authorizer/AclAuthorizer.scala
##
@@ -304,6 +309,137 @@ class AclAuthorizer extends Authorizer with Logging {
 if (zkClient != null) zkClient.close()
   }
 
+  override def authorizeByResourceType(requestContext: 
AuthorizableRequestContext,
+   op: AclOperation,
+   resourceType: ResourceType): 
AuthorizationResult = {
+SecurityUtils.authorizeByResourceTypeCheckArgs(op, resourceType)
+
+val principal = new KafkaPrincipal(
+  requestContext.principal().getPrincipalType,
+  requestContext.principal().getName)
+
+if (isSuperUser(principal))
+  return AuthorizationResult.ALLOWED
+
+val principalStr = principal.toString
+
+val host = requestContext.clientAddress().getHostAddress
+val action = new Action(op, new ResourcePattern(resourceType, "NONE", 
PatternType.UNKNOWN), 0, true, true)
+
+val denyLiterals = matchingResources(
+  principalStr, host, op, AclPermissionType.DENY, resourceType, 
PatternType.LITERAL)
+
+if (denyAll(denyLiterals)) {
+  logAuditMessage(requestContext, action, authorized = false)
+  return AuthorizationResult.DENIED
+}
+
+if (shouldAllowEveryoneIfNoAclIsFound) {
+  logAuditMessage(requestContext, action, authorized = true)
+  return AuthorizationResult.ALLOWED
+}
+
+val denyPrefixes = matchingResources(
+  principalStr, host, op, AclPermissionType.DENY, resourceType, 
PatternType.PREFIXED)
+
+if (denyLiterals.isEmpty && denyPrefixes.isEmpty) {
+  if (hasMatchingResources(principalStr, host, op, 
AclPermissionType.ALLOW, resourceType, PatternType.PREFIXED)
+  || hasMatchingResources(principalStr, host, op, 
AclPermissionType.ALLOW, resourceType, PatternType.LITERAL)) {
+logAuditMessage(requestContext, action, authorized = true)
+return AuthorizationResult.ALLOWED
+  } else {
+logAuditMessage(requestContext, action, authorized = false)
+return AuthorizationResult.DENIED
+  }
+}
+
+val allowLiterals = matchingResources(
+  principalStr, host, op, AclPermissionType.ALLOW, resourceType, 
PatternType.LITERAL)
+val allowPrefixes = matchingResources(
+  principalStr, host, op, AclPermissionType.ALLOW, resourceType, 
PatternType.PREFIXED)
+
+if (allowAny(allowLiterals, allowPrefixes, denyLiterals, denyPrefixes)) {
+  logAuditMessage(requestContext, action, authorized = true)
+  return AuthorizationResult.ALLOWED
+}
+
+logAuditMessage(requestContext, action, authorized = false)
+AuthorizationResult.DENIED
+  }
+
+  def matchingResources(principal: String, host: String, op: AclOperation, 
permission: AclPermissionType,
+resourceType: ResourceType, patternType: PatternType): 
List[immutable.HashSet[String]] = {
+var matched = List[immutable.HashSet[String]]()
+for (p <- Set(principal, AclEntry.WildcardPrincipalString)) {
+  for (h <- Set(host, AclEntry.WildcardHost)) {
+for (o <- Set(op, AclOperation.ALL)) {
+  val resourceIndex = new ResourceTypeKey(
+new AccessControlEntry(p, h, o, permission), resourceType, 
patternType)
+  resourceCache.get(resourceIndex) match {
+case Some(resources) => matched = matched :+ resources
+case None =>
+  }
+}
+  }
+}
+matched
+  }
+
+  def hasMatchingResources(principal: String, host: String, op: AclOperation, 
permission: AclPermissionType,
+   resourceType: ResourceType, patternType: 
PatternType): Boolean = {
+for (p <- Set(principal, AclEntry.WildcardPrincipalString)) {
+  for (h <- Set(host, AclEntry.WildcardHost)) {
+for (o <- Set(op, AclOperation.ALL)) {
+  val resourceIndex = new ResourceTypeKey(
+new AccessControlEntry(p, h, o, permission), resourceType, 
patternType)
+  resourceCache.get(resourceIndex) match {
+case Some(_) => return true
+case None =>
+  }
+}
+  }
+}
+false
+  }
+
+  private def denyAll(denyLiterals: List[immutable.HashSet[String]]): Boolean =
+denyLiterals.exists(r => r.contains(ResourcePattern.WILDCARD_RESOURCE))
+
+
+  private def allowAny(allowLiterals: List[immutable.Set[String]], 
allowPrefixes: List[immutable.Set[String]],
+   denyLiterals: List[immutable.Set[String]], 
denyPrefixes: List[immutable.Set[String]]): Boolean = {
+(allowPrefixes.exists(prefixes =>
+  prefixes.exists(prefix => allowPrefix(prefix, denyPrefixes)))
+  || allowLiterals.exists(literals =>
+literals.exists(literal => allowLiteral(literal, denyLiterals, 
denyPrefixes
+  }
+
+  private def allowLiteral(literalName: 

[GitHub] [kafka] d8tltanc commented on a change in pull request #9485: KAKFA-10619: Idempotent producer will get authorized once it has a WRITE access to at least one topic

2020-12-17 Thread GitBox


d8tltanc commented on a change in pull request #9485:
URL: https://github.com/apache/kafka/pull/9485#discussion_r545579837



##
File path: core/src/main/scala/kafka/security/authorizer/AclAuthorizer.scala
##
@@ -304,6 +309,137 @@ class AclAuthorizer extends Authorizer with Logging {
 if (zkClient != null) zkClient.close()
   }
 
+  override def authorizeByResourceType(requestContext: 
AuthorizableRequestContext,
+   op: AclOperation,
+   resourceType: ResourceType): 
AuthorizationResult = {
+SecurityUtils.authorizeByResourceTypeCheckArgs(op, resourceType)
+
+val principal = new KafkaPrincipal(
+  requestContext.principal().getPrincipalType,
+  requestContext.principal().getName)
+
+if (isSuperUser(principal))
+  return AuthorizationResult.ALLOWED
+
+val principalStr = principal.toString
+
+val host = requestContext.clientAddress().getHostAddress
+val action = new Action(op, new ResourcePattern(resourceType, "NONE", 
PatternType.UNKNOWN), 0, true, true)
+
+val denyLiterals = matchingResources(
+  principalStr, host, op, AclPermissionType.DENY, resourceType, 
PatternType.LITERAL)
+
+if (denyAll(denyLiterals)) {
+  logAuditMessage(requestContext, action, authorized = false)
+  return AuthorizationResult.DENIED
+}
+
+if (shouldAllowEveryoneIfNoAclIsFound) {
+  logAuditMessage(requestContext, action, authorized = true)
+  return AuthorizationResult.ALLOWED
+}
+
+val denyPrefixes = matchingResources(
+  principalStr, host, op, AclPermissionType.DENY, resourceType, 
PatternType.PREFIXED)
+
+if (denyLiterals.isEmpty && denyPrefixes.isEmpty) {
+  if (hasMatchingResources(principalStr, host, op, 
AclPermissionType.ALLOW, resourceType, PatternType.PREFIXED)
+  || hasMatchingResources(principalStr, host, op, 
AclPermissionType.ALLOW, resourceType, PatternType.LITERAL)) {
+logAuditMessage(requestContext, action, authorized = true)
+return AuthorizationResult.ALLOWED
+  } else {
+logAuditMessage(requestContext, action, authorized = false)
+return AuthorizationResult.DENIED
+  }
+}
+
+val allowLiterals = matchingResources(
+  principalStr, host, op, AclPermissionType.ALLOW, resourceType, 
PatternType.LITERAL)
+val allowPrefixes = matchingResources(
+  principalStr, host, op, AclPermissionType.ALLOW, resourceType, 
PatternType.PREFIXED)
+
+if (allowAny(allowLiterals, allowPrefixes, denyLiterals, denyPrefixes)) {
+  logAuditMessage(requestContext, action, authorized = true)
+  return AuthorizationResult.ALLOWED
+}
+
+logAuditMessage(requestContext, action, authorized = false)
+AuthorizationResult.DENIED
+  }
+
+  def matchingResources(principal: String, host: String, op: AclOperation, 
permission: AclPermissionType,
+resourceType: ResourceType, patternType: PatternType): 
List[immutable.HashSet[String]] = {
+var matched = List[immutable.HashSet[String]]()
+for (p <- Set(principal, AclEntry.WildcardPrincipalString)) {
+  for (h <- Set(host, AclEntry.WildcardHost)) {
+for (o <- Set(op, AclOperation.ALL)) {
+  val resourceIndex = new ResourceTypeKey(
+new AccessControlEntry(p, h, o, permission), resourceType, 
patternType)
+  resourceCache.get(resourceIndex) match {
+case Some(resources) => matched = matched :+ resources
+case None =>
+  }
+}
+  }
+}
+matched
+  }
+
+  def hasMatchingResources(principal: String, host: String, op: AclOperation, 
permission: AclPermissionType,
+   resourceType: ResourceType, patternType: 
PatternType): Boolean = {
+for (p <- Set(principal, AclEntry.WildcardPrincipalString)) {
+  for (h <- Set(host, AclEntry.WildcardHost)) {
+for (o <- Set(op, AclOperation.ALL)) {
+  val resourceIndex = new ResourceTypeKey(
+new AccessControlEntry(p, h, o, permission), resourceType, 
patternType)
+  resourceCache.get(resourceIndex) match {
+case Some(_) => return true
+case None =>
+  }
+}
+  }
+}
+false
+  }
+
+  private def denyAll(denyLiterals: List[immutable.HashSet[String]]): Boolean =
+denyLiterals.exists(r => r.contains(ResourcePattern.WILDCARD_RESOURCE))
+
+
+  private def allowAny(allowLiterals: List[immutable.Set[String]], 
allowPrefixes: List[immutable.Set[String]],
+   denyLiterals: List[immutable.Set[String]], 
denyPrefixes: List[immutable.Set[String]]): Boolean = {
+(allowPrefixes.exists(prefixes =>

Review comment:
   commit b6a766b





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific 

[GitHub] [kafka] d8tltanc commented on a change in pull request #9485: KAKFA-10619: Idempotent producer will get authorized once it has a WRITE access to at least one topic

2020-12-17 Thread GitBox


d8tltanc commented on a change in pull request #9485:
URL: https://github.com/apache/kafka/pull/9485#discussion_r545578734



##
File path: core/src/main/scala/kafka/security/authorizer/AclAuthorizer.scala
##
@@ -304,6 +309,137 @@ class AclAuthorizer extends Authorizer with Logging {
 if (zkClient != null) zkClient.close()
   }
 
+  override def authorizeByResourceType(requestContext: 
AuthorizableRequestContext,
+   op: AclOperation,
+   resourceType: ResourceType): 
AuthorizationResult = {
+SecurityUtils.authorizeByResourceTypeCheckArgs(op, resourceType)
+
+val principal = new KafkaPrincipal(
+  requestContext.principal().getPrincipalType,
+  requestContext.principal().getName)
+
+if (isSuperUser(principal))
+  return AuthorizationResult.ALLOWED
+
+val principalStr = principal.toString
+
+val host = requestContext.clientAddress().getHostAddress
+val action = new Action(op, new ResourcePattern(resourceType, "NONE", 
PatternType.UNKNOWN), 0, true, true)
+
+val denyLiterals = matchingResources(
+  principalStr, host, op, AclPermissionType.DENY, resourceType, 
PatternType.LITERAL)
+
+if (denyAll(denyLiterals)) {
+  logAuditMessage(requestContext, action, authorized = false)
+  return AuthorizationResult.DENIED
+}
+
+if (shouldAllowEveryoneIfNoAclIsFound) {
+  logAuditMessage(requestContext, action, authorized = true)
+  return AuthorizationResult.ALLOWED
+}
+
+val denyPrefixes = matchingResources(
+  principalStr, host, op, AclPermissionType.DENY, resourceType, 
PatternType.PREFIXED)
+
+if (denyLiterals.isEmpty && denyPrefixes.isEmpty) {
+  if (hasMatchingResources(principalStr, host, op, 
AclPermissionType.ALLOW, resourceType, PatternType.PREFIXED)
+  || hasMatchingResources(principalStr, host, op, 
AclPermissionType.ALLOW, resourceType, PatternType.LITERAL)) {
+logAuditMessage(requestContext, action, authorized = true)
+return AuthorizationResult.ALLOWED
+  } else {
+logAuditMessage(requestContext, action, authorized = false)
+return AuthorizationResult.DENIED
+  }
+}
+
+val allowLiterals = matchingResources(
+  principalStr, host, op, AclPermissionType.ALLOW, resourceType, 
PatternType.LITERAL)
+val allowPrefixes = matchingResources(
+  principalStr, host, op, AclPermissionType.ALLOW, resourceType, 
PatternType.PREFIXED)
+
+if (allowAny(allowLiterals, allowPrefixes, denyLiterals, denyPrefixes)) {
+  logAuditMessage(requestContext, action, authorized = true)
+  return AuthorizationResult.ALLOWED
+}
+
+logAuditMessage(requestContext, action, authorized = false)
+AuthorizationResult.DENIED
+  }
+
+  def matchingResources(principal: String, host: String, op: AclOperation, 
permission: AclPermissionType,
+resourceType: ResourceType, patternType: PatternType): 
List[immutable.HashSet[String]] = {
+var matched = List[immutable.HashSet[String]]()
+for (p <- Set(principal, AclEntry.WildcardPrincipalString)) {
+  for (h <- Set(host, AclEntry.WildcardHost)) {
+for (o <- Set(op, AclOperation.ALL)) {
+  val resourceIndex = new ResourceTypeKey(
+new AccessControlEntry(p, h, o, permission), resourceType, 
patternType)
+  resourceCache.get(resourceIndex) match {
+case Some(resources) => matched = matched :+ resources
+case None =>
+  }
+}
+  }
+}
+matched
+  }
+
+  def hasMatchingResources(principal: String, host: String, op: AclOperation, 
permission: AclPermissionType,
+   resourceType: ResourceType, patternType: 
PatternType): Boolean = {
+for (p <- Set(principal, AclEntry.WildcardPrincipalString)) {
+  for (h <- Set(host, AclEntry.WildcardHost)) {
+for (o <- Set(op, AclOperation.ALL)) {
+  val resourceIndex = new ResourceTypeKey(
+new AccessControlEntry(p, h, o, permission), resourceType, 
patternType)
+  resourceCache.get(resourceIndex) match {
+case Some(_) => return true
+case None =>
+  }
+}
+  }
+}
+false
+  }
+
+  private def denyAll(denyLiterals: List[immutable.HashSet[String]]): Boolean =
+denyLiterals.exists(r => r.contains(ResourcePattern.WILDCARD_RESOURCE))

Review comment:
   Yes. Didn't realize the existence of this syntax be4. Thanks.
   commit b6a766b





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] d8tltanc commented on a change in pull request #9485: KAKFA-10619: Idempotent producer will get authorized once it has a WRITE access to at least one topic

2020-12-17 Thread GitBox


d8tltanc commented on a change in pull request #9485:
URL: https://github.com/apache/kafka/pull/9485#discussion_r545578559



##
File path: core/src/main/scala/kafka/security/authorizer/AclAuthorizer.scala
##
@@ -304,6 +309,137 @@ class AclAuthorizer extends Authorizer with Logging {
 if (zkClient != null) zkClient.close()
   }
 
+  override def authorizeByResourceType(requestContext: 
AuthorizableRequestContext,
+   op: AclOperation,
+   resourceType: ResourceType): 
AuthorizationResult = {
+SecurityUtils.authorizeByResourceTypeCheckArgs(op, resourceType)
+
+val principal = new KafkaPrincipal(
+  requestContext.principal().getPrincipalType,
+  requestContext.principal().getName)
+
+if (isSuperUser(principal))
+  return AuthorizationResult.ALLOWED
+
+val principalStr = principal.toString
+
+val host = requestContext.clientAddress().getHostAddress
+val action = new Action(op, new ResourcePattern(resourceType, "NONE", 
PatternType.UNKNOWN), 0, true, true)
+
+val denyLiterals = matchingResources(
+  principalStr, host, op, AclPermissionType.DENY, resourceType, 
PatternType.LITERAL)
+
+if (denyAll(denyLiterals)) {
+  logAuditMessage(requestContext, action, authorized = false)
+  return AuthorizationResult.DENIED
+}
+
+if (shouldAllowEveryoneIfNoAclIsFound) {
+  logAuditMessage(requestContext, action, authorized = true)
+  return AuthorizationResult.ALLOWED
+}
+
+val denyPrefixes = matchingResources(
+  principalStr, host, op, AclPermissionType.DENY, resourceType, 
PatternType.PREFIXED)
+
+if (denyLiterals.isEmpty && denyPrefixes.isEmpty) {
+  if (hasMatchingResources(principalStr, host, op, 
AclPermissionType.ALLOW, resourceType, PatternType.PREFIXED)
+  || hasMatchingResources(principalStr, host, op, 
AclPermissionType.ALLOW, resourceType, PatternType.LITERAL)) {
+logAuditMessage(requestContext, action, authorized = true)
+return AuthorizationResult.ALLOWED
+  } else {
+logAuditMessage(requestContext, action, authorized = false)
+return AuthorizationResult.DENIED
+  }
+}
+
+val allowLiterals = matchingResources(
+  principalStr, host, op, AclPermissionType.ALLOW, resourceType, 
PatternType.LITERAL)
+val allowPrefixes = matchingResources(
+  principalStr, host, op, AclPermissionType.ALLOW, resourceType, 
PatternType.PREFIXED)
+
+if (allowAny(allowLiterals, allowPrefixes, denyLiterals, denyPrefixes)) {
+  logAuditMessage(requestContext, action, authorized = true)
+  return AuthorizationResult.ALLOWED
+}
+
+logAuditMessage(requestContext, action, authorized = false)
+AuthorizationResult.DENIED
+  }
+
+  def matchingResources(principal: String, host: String, op: AclOperation, 
permission: AclPermissionType,
+resourceType: ResourceType, patternType: PatternType): 
List[immutable.HashSet[String]] = {
+var matched = List[immutable.HashSet[String]]()
+for (p <- Set(principal, AclEntry.WildcardPrincipalString)) {
+  for (h <- Set(host, AclEntry.WildcardHost)) {
+for (o <- Set(op, AclOperation.ALL)) {
+  val resourceIndex = new ResourceTypeKey(
+new AccessControlEntry(p, h, o, permission), resourceType, 
patternType)
+  resourceCache.get(resourceIndex) match {
+case Some(resources) => matched = matched :+ resources
+case None =>
+  }
+}
+  }
+}
+matched
+  }
+
+  def hasMatchingResources(principal: String, host: String, op: AclOperation, 
permission: AclPermissionType,
+   resourceType: ResourceType, patternType: 
PatternType): Boolean = {
+for (p <- Set(principal, AclEntry.WildcardPrincipalString)) {
+  for (h <- Set(host, AclEntry.WildcardHost)) {
+for (o <- Set(op, AclOperation.ALL)) {

Review comment:
   commit b6a766b





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] d8tltanc commented on a change in pull request #9485: KAKFA-10619: Idempotent producer will get authorized once it has a WRITE access to at least one topic

2020-12-17 Thread GitBox


d8tltanc commented on a change in pull request #9485:
URL: https://github.com/apache/kafka/pull/9485#discussion_r545578070



##
File path: core/src/main/scala/kafka/security/authorizer/AclAuthorizer.scala
##
@@ -304,6 +309,137 @@ class AclAuthorizer extends Authorizer with Logging {
 if (zkClient != null) zkClient.close()
   }
 
+  override def authorizeByResourceType(requestContext: 
AuthorizableRequestContext,
+   op: AclOperation,
+   resourceType: ResourceType): 
AuthorizationResult = {
+SecurityUtils.authorizeByResourceTypeCheckArgs(op, resourceType)
+
+val principal = new KafkaPrincipal(
+  requestContext.principal().getPrincipalType,
+  requestContext.principal().getName)
+
+if (isSuperUser(principal))
+  return AuthorizationResult.ALLOWED
+
+val principalStr = principal.toString
+
+val host = requestContext.clientAddress().getHostAddress
+val action = new Action(op, new ResourcePattern(resourceType, "NONE", 
PatternType.UNKNOWN), 0, true, true)
+
+val denyLiterals = matchingResources(
+  principalStr, host, op, AclPermissionType.DENY, resourceType, 
PatternType.LITERAL)
+
+if (denyAll(denyLiterals)) {
+  logAuditMessage(requestContext, action, authorized = false)
+  return AuthorizationResult.DENIED
+}
+
+if (shouldAllowEveryoneIfNoAclIsFound) {
+  logAuditMessage(requestContext, action, authorized = true)
+  return AuthorizationResult.ALLOWED
+}
+
+val denyPrefixes = matchingResources(
+  principalStr, host, op, AclPermissionType.DENY, resourceType, 
PatternType.PREFIXED)
+
+if (denyLiterals.isEmpty && denyPrefixes.isEmpty) {
+  if (hasMatchingResources(principalStr, host, op, 
AclPermissionType.ALLOW, resourceType, PatternType.PREFIXED)
+  || hasMatchingResources(principalStr, host, op, 
AclPermissionType.ALLOW, resourceType, PatternType.LITERAL)) {
+logAuditMessage(requestContext, action, authorized = true)
+return AuthorizationResult.ALLOWED
+  } else {
+logAuditMessage(requestContext, action, authorized = false)
+return AuthorizationResult.DENIED
+  }
+}
+
+val allowLiterals = matchingResources(
+  principalStr, host, op, AclPermissionType.ALLOW, resourceType, 
PatternType.LITERAL)
+val allowPrefixes = matchingResources(
+  principalStr, host, op, AclPermissionType.ALLOW, resourceType, 
PatternType.PREFIXED)
+
+if (allowAny(allowLiterals, allowPrefixes, denyLiterals, denyPrefixes)) {
+  logAuditMessage(requestContext, action, authorized = true)
+  return AuthorizationResult.ALLOWED
+}
+
+logAuditMessage(requestContext, action, authorized = false)
+AuthorizationResult.DENIED
+  }
+
+  def matchingResources(principal: String, host: String, op: AclOperation, 
permission: AclPermissionType,
+resourceType: ResourceType, patternType: PatternType): 
List[immutable.HashSet[String]] = {
+var matched = List[immutable.HashSet[String]]()

Review comment:
   Right, though it's only a list of 8.
   commit b6a766b





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] d8tltanc commented on a change in pull request #9485: KAKFA-10619: Idempotent producer will get authorized once it has a WRITE access to at least one topic

2020-12-17 Thread GitBox


d8tltanc commented on a change in pull request #9485:
URL: https://github.com/apache/kafka/pull/9485#discussion_r545577952



##
File path: core/src/main/scala/kafka/security/authorizer/AclAuthorizer.scala
##
@@ -304,6 +309,137 @@ class AclAuthorizer extends Authorizer with Logging {
 if (zkClient != null) zkClient.close()
   }
 
+  override def authorizeByResourceType(requestContext: 
AuthorizableRequestContext,
+   op: AclOperation,
+   resourceType: ResourceType): 
AuthorizationResult = {
+SecurityUtils.authorizeByResourceTypeCheckArgs(op, resourceType)
+
+val principal = new KafkaPrincipal(
+  requestContext.principal().getPrincipalType,
+  requestContext.principal().getName)
+
+if (isSuperUser(principal))
+  return AuthorizationResult.ALLOWED
+
+val principalStr = principal.toString
+
+val host = requestContext.clientAddress().getHostAddress
+val action = new Action(op, new ResourcePattern(resourceType, "NONE", 
PatternType.UNKNOWN), 0, true, true)
+
+val denyLiterals = matchingResources(
+  principalStr, host, op, AclPermissionType.DENY, resourceType, 
PatternType.LITERAL)
+
+if (denyAll(denyLiterals)) {
+  logAuditMessage(requestContext, action, authorized = false)
+  return AuthorizationResult.DENIED
+}
+
+if (shouldAllowEveryoneIfNoAclIsFound) {
+  logAuditMessage(requestContext, action, authorized = true)
+  return AuthorizationResult.ALLOWED
+}
+
+val denyPrefixes = matchingResources(
+  principalStr, host, op, AclPermissionType.DENY, resourceType, 
PatternType.PREFIXED)
+
+if (denyLiterals.isEmpty && denyPrefixes.isEmpty) {
+  if (hasMatchingResources(principalStr, host, op, 
AclPermissionType.ALLOW, resourceType, PatternType.PREFIXED)
+  || hasMatchingResources(principalStr, host, op, 
AclPermissionType.ALLOW, resourceType, PatternType.LITERAL)) {
+logAuditMessage(requestContext, action, authorized = true)
+return AuthorizationResult.ALLOWED
+  } else {
+logAuditMessage(requestContext, action, authorized = false)
+return AuthorizationResult.DENIED
+  }
+}
+
+val allowLiterals = matchingResources(
+  principalStr, host, op, AclPermissionType.ALLOW, resourceType, 
PatternType.LITERAL)
+val allowPrefixes = matchingResources(
+  principalStr, host, op, AclPermissionType.ALLOW, resourceType, 
PatternType.PREFIXED)
+
+if (allowAny(allowLiterals, allowPrefixes, denyLiterals, denyPrefixes)) {
+  logAuditMessage(requestContext, action, authorized = true)
+  return AuthorizationResult.ALLOWED
+}
+
+logAuditMessage(requestContext, action, authorized = false)
+AuthorizationResult.DENIED
+  }
+
+  def matchingResources(principal: String, host: String, op: AclOperation, 
permission: AclPermissionType,
+resourceType: ResourceType, patternType: PatternType): 
List[immutable.HashSet[String]] = {
+var matched = List[immutable.HashSet[String]]()
+for (p <- Set(principal, AclEntry.WildcardPrincipalString)) {
+  for (h <- Set(host, AclEntry.WildcardHost)) {
+for (o <- Set(op, AclOperation.ALL)) {

Review comment:
   commit b6a766b





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] d8tltanc commented on a change in pull request #9485: KAKFA-10619: Idempotent producer will get authorized once it has a WRITE access to at least one topic

2020-12-17 Thread GitBox


d8tltanc commented on a change in pull request #9485:
URL: https://github.com/apache/kafka/pull/9485#discussion_r545577832



##
File path: 
clients/src/main/java/org/apache/kafka/server/authorizer/Authorizer.java
##
@@ -139,4 +152,133 @@
  * @return Iterator for ACL bindings, which may be populated lazily.
  */
 Iterable acls(AclBindingFilter filter);
+
+/**
+ * Check if the caller is authorized to perform the given ACL operation on 
at least one
+ * resource of the given type.
+ *
+ * It is important to override this interface default in implementations 
because
+ * 1. The interface default iterates all AclBindings multiple times, 
without any indexing,
+ *which is a CPU intense work.
+ * 2. The interface default rebuild several sets of strings, which is a 
memory intense work.
+ * 3. The interface default cannot perform the audit logging properly
+ *
+ * @param requestContext Request context including request resourceType, 
security protocol, and listener name
+ * @param op The ACL operation to check
+ * @param resourceType   The resource type to check
+ * @return   Return {@link AuthorizationResult#ALLOWED} if the 
caller is authorized to perform the
+ *   given ACL operation on at least one resource of 
the given type.
+ *   Return {@link AuthorizationResult#DENIED} 
otherwise.
+ */
+default AuthorizationResult 
authorizeByResourceType(AuthorizableRequestContext requestContext, AclOperation 
op, ResourceType resourceType) {
+SecurityUtils.authorizeByResourceTypeCheckArgs(op, resourceType);
+
+if (authorize(requestContext, Collections.singletonList(new Action(

Review comment:
   commit b6a766b

##
File path: 
clients/src/main/java/org/apache/kafka/server/authorizer/Authorizer.java
##
@@ -139,4 +152,133 @@
  * @return Iterator for ACL bindings, which may be populated lazily.
  */
 Iterable acls(AclBindingFilter filter);
+
+/**
+ * Check if the caller is authorized to perform the given ACL operation on 
at least one
+ * resource of the given type.
+ *
+ * It is important to override this interface default in implementations 
because
+ * 1. The interface default iterates all AclBindings multiple times, 
without any indexing,
+ *which is a CPU intense work.
+ * 2. The interface default rebuild several sets of strings, which is a 
memory intense work.
+ * 3. The interface default cannot perform the audit logging properly
+ *
+ * @param requestContext Request context including request resourceType, 
security protocol, and listener name
+ * @param op The ACL operation to check
+ * @param resourceType   The resource type to check
+ * @return   Return {@link AuthorizationResult#ALLOWED} if the 
caller is authorized to perform the
+ *   given ACL operation on at least one resource of 
the given type.
+ *   Return {@link AuthorizationResult#DENIED} 
otherwise.
+ */
+default AuthorizationResult 
authorizeByResourceType(AuthorizableRequestContext requestContext, AclOperation 
op, ResourceType resourceType) {
+SecurityUtils.authorizeByResourceTypeCheckArgs(op, resourceType);
+
+if (authorize(requestContext, Collections.singletonList(new Action(
+op, new ResourcePattern(resourceType, "hardcode", 
PatternType.LITERAL),
+0, true, false)))
+.get(0) == AuthorizationResult.ALLOWED) {
+return AuthorizationResult.ALLOWED;
+}
+
+// Filter out all the resource pattern corresponding to the 
RequestContext,
+// AclOperation, and ResourceType
+ResourcePatternFilter resourceTypeFilter = new ResourcePatternFilter(
+resourceType, null, PatternType.ANY);
+AclBindingFilter aclFilter = new AclBindingFilter(
+resourceTypeFilter, AccessControlEntryFilter.ANY);
+
+EnumMap> denyPatterns =
+new EnumMap>(PatternType.class) {{
+put(PatternType.LITERAL, new HashSet<>());
+put(PatternType.PREFIXED, new HashSet<>());
+}};
+EnumMap> allowPatterns =
+new EnumMap>(PatternType.class) {{
+put(PatternType.LITERAL, new HashSet<>());
+put(PatternType.PREFIXED, new HashSet<>());
+}};
+
+boolean hasWildCardAllow = false;
+
+KafkaPrincipal principal = new KafkaPrincipal(
+requestContext.principal().getPrincipalType(),
+requestContext.principal().getName());
+String hostAddr = requestContext.clientAddress().getHostAddress();
+
+for (AclBinding binding : acls(aclFilter)) {
+if (!binding.entry().host().equals(hostAddr) && 
!binding.entry().host().equals("*"))
+

[GitHub] [kafka] d8tltanc commented on a change in pull request #9485: KAKFA-10619: Idempotent producer will get authorized once it has a WRITE access to at least one topic

2020-12-17 Thread GitBox


d8tltanc commented on a change in pull request #9485:
URL: https://github.com/apache/kafka/pull/9485#discussion_r54552



##
File path: 
clients/src/main/java/org/apache/kafka/server/authorizer/Authorizer.java
##
@@ -139,4 +152,133 @@
  * @return Iterator for ACL bindings, which may be populated lazily.
  */
 Iterable acls(AclBindingFilter filter);
+
+/**
+ * Check if the caller is authorized to perform the given ACL operation on 
at least one
+ * resource of the given type.
+ *
+ * It is important to override this interface default in implementations 
because
+ * 1. The interface default iterates all AclBindings multiple times, 
without any indexing,
+ *which is a CPU intense work.
+ * 2. The interface default rebuild several sets of strings, which is a 
memory intense work.
+ * 3. The interface default cannot perform the audit logging properly
+ *
+ * @param requestContext Request context including request resourceType, 
security protocol, and listener name
+ * @param op The ACL operation to check
+ * @param resourceType   The resource type to check
+ * @return   Return {@link AuthorizationResult#ALLOWED} if the 
caller is authorized to perform the
+ *   given ACL operation on at least one resource of 
the given type.
+ *   Return {@link AuthorizationResult#DENIED} 
otherwise.
+ */
+default AuthorizationResult 
authorizeByResourceType(AuthorizableRequestContext requestContext, AclOperation 
op, ResourceType resourceType) {
+SecurityUtils.authorizeByResourceTypeCheckArgs(op, resourceType);
+
+if (authorize(requestContext, Collections.singletonList(new Action(

Review comment:
   // Check a hard-coded name to ensure that super users are granted
   // access regardless of DENY ACLs.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] d8tltanc commented on a change in pull request #9485: KAKFA-10619: Idempotent producer will get authorized once it has a WRITE access to at least one topic

2020-12-17 Thread GitBox


d8tltanc commented on a change in pull request #9485:
URL: https://github.com/apache/kafka/pull/9485#discussion_r545576808



##
File path: 
clients/src/main/java/org/apache/kafka/server/authorizer/Authorizer.java
##
@@ -139,4 +152,133 @@
  * @return Iterator for ACL bindings, which may be populated lazily.
  */
 Iterable acls(AclBindingFilter filter);
+
+/**
+ * Check if the caller is authorized to perform the given ACL operation on 
at least one
+ * resource of the given type.
+ *
+ * It is important to override this interface default in implementations 
because

Review comment:
   
![image](https://user-images.githubusercontent.com/31675100/102577221-7bc51c00-40ac-11eb-89a1-234d0af3f0fd.png)
   commit b6a766b





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] d8tltanc commented on a change in pull request #9485: KAKFA-10619: Idempotent producer will get authorized once it has a WRITE access to at least one topic

2020-12-17 Thread GitBox


d8tltanc commented on a change in pull request #9485:
URL: https://github.com/apache/kafka/pull/9485#discussion_r545576700



##
File path: 
clients/src/main/java/org/apache/kafka/common/utils/SecurityUtils.java
##
@@ -146,4 +148,32 @@ else if (capitalizeNext) {
 }
 return builder.toString();
 }
+
+public static void authorizeByResourceTypeCheckArgs(AclOperation op,
+ResourceType type) {
+if (type == ResourceType.ANY) {
+throw new IllegalArgumentException(
+"Must specify a non-filter resource type for 
authorizeByResourceType");
+}
+
+if (type == ResourceType.UNKNOWN) {
+throw new IllegalArgumentException(
+"Unknown resource type");
+}
+
+if (op == AclOperation.ANY) {
+throw new IllegalArgumentException(
+"Must specify a non-filter operation type for 
authorizeByResourceType");
+}
+
+if (op == AclOperation.UNKNOWN) {
+throw new IllegalArgumentException(
+"Unknown operation type");
+}
+}
+
+public static boolean canDenyAll(ResourcePattern pattern) {

Review comment:
   commit b6a766b





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] d8tltanc commented on a change in pull request #9485: KAKFA-10619: Idempotent producer will get authorized once it has a WRITE access to at least one topic

2020-12-17 Thread GitBox


d8tltanc commented on a change in pull request #9485:
URL: https://github.com/apache/kafka/pull/9485#discussion_r545576579



##
File path: core/src/main/scala/kafka/security/authorizer/AclAuthorizer.scala
##
@@ -547,7 +683,37 @@ class AclAuthorizer extends Authorizer with Logging {
 zkClient.getVersionedAclsForResource(resource)
   }
 
-  private def updateCache(resource: ResourcePattern, versionedAcls: 
VersionedAcls): Unit = {
+  // Visible for benchmark
+  def updateCache(resource: ResourcePattern, versionedAcls: VersionedAcls): 
Unit = {
+val currentAces: Set[AccessControlEntry] = aclCache.get(resource) match {
+  case Some(versionedAcls) => versionedAcls.acls.map(aclEntry => 
aclEntry.ace)
+  case None => Set.empty
+}
+val newAces: Set[AccessControlEntry] = versionedAcls.acls.map(aclEntry => 
aclEntry.ace)
+val acesToAdd = newAces.diff(currentAces)
+val acesToRemove = currentAces.diff(newAces)
+
+acesToAdd.foreach(ace => {
+  val resourceIndex = new ResourceTypeKey(ace, resource.resourceType(), 
resource.patternType())
+  resourceCache.get(resourceIndex) match {
+case Some(resources) => resourceCache += (resourceIndex -> (resources 
+ resource.name()))
+case None => resourceCache += (resourceIndex -> 
immutable.HashSet(resource.name()))
+  }
+})
+acesToRemove.foreach(ace => {
+  val resourceIndex = new ResourceTypeKey(ace, resource.resourceType(), 
resource.patternType())

Review comment:
   commit b6a766b





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] d8tltanc commented on a change in pull request #9485: KAKFA-10619: Idempotent producer will get authorized once it has a WRITE access to at least one topic

2020-12-17 Thread GitBox


d8tltanc commented on a change in pull request #9485:
URL: https://github.com/apache/kafka/pull/9485#discussion_r545576394



##
File path: core/src/main/scala/kafka/security/authorizer/AclAuthorizer.scala
##
@@ -547,7 +683,37 @@ class AclAuthorizer extends Authorizer with Logging {
 zkClient.getVersionedAclsForResource(resource)
   }
 
-  private def updateCache(resource: ResourcePattern, versionedAcls: 
VersionedAcls): Unit = {
+  // Visible for benchmark
+  def updateCache(resource: ResourcePattern, versionedAcls: VersionedAcls): 
Unit = {
+val currentAces: Set[AccessControlEntry] = aclCache.get(resource) match {
+  case Some(versionedAcls) => versionedAcls.acls.map(aclEntry => 
aclEntry.ace)
+  case None => Set.empty
+}
+val newAces: Set[AccessControlEntry] = versionedAcls.acls.map(aclEntry => 
aclEntry.ace)
+val acesToAdd = newAces.diff(currentAces)
+val acesToRemove = currentAces.diff(newAces)
+
+acesToAdd.foreach(ace => {
+  val resourceIndex = new ResourceTypeKey(ace, resource.resourceType(), 
resource.patternType())

Review comment:
   commit b6a766b





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] d8tltanc commented on a change in pull request #9485: KAKFA-10619: Idempotent producer will get authorized once it has a WRITE access to at least one topic

2020-12-17 Thread GitBox


d8tltanc commented on a change in pull request #9485:
URL: https://github.com/apache/kafka/pull/9485#discussion_r545576162



##
File path: core/src/main/scala/kafka/security/authorizer/AclAuthorizer.scala
##
@@ -547,7 +683,37 @@ class AclAuthorizer extends Authorizer with Logging {
 zkClient.getVersionedAclsForResource(resource)
   }
 
-  private def updateCache(resource: ResourcePattern, versionedAcls: 
VersionedAcls): Unit = {
+  // Visible for benchmark
+  def updateCache(resource: ResourcePattern, versionedAcls: VersionedAcls): 
Unit = {
+val currentAces: Set[AccessControlEntry] = aclCache.get(resource) match {
+  case Some(versionedAcls) => versionedAcls.acls.map(aclEntry => 
aclEntry.ace)
+  case None => Set.empty

Review comment:
   commit b6a766b





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] d8tltanc commented on a change in pull request #9485: KAKFA-10619: Idempotent producer will get authorized once it has a WRITE access to at least one topic

2020-12-17 Thread GitBox


d8tltanc commented on a change in pull request #9485:
URL: https://github.com/apache/kafka/pull/9485#discussion_r545531049



##
File path: core/src/main/scala/kafka/security/authorizer/AclAuthorizer.scala
##
@@ -304,6 +309,137 @@ class AclAuthorizer extends Authorizer with Logging {
 if (zkClient != null) zkClient.close()
   }
 
+  override def authorizeByResourceType(requestContext: 
AuthorizableRequestContext,
+   op: AclOperation,
+   resourceType: ResourceType): 
AuthorizationResult = {
+SecurityUtils.authorizeByResourceTypeCheckArgs(op, resourceType)
+
+val principal = new KafkaPrincipal(
+  requestContext.principal().getPrincipalType,
+  requestContext.principal().getName)
+
+if (isSuperUser(principal))
+  return AuthorizationResult.ALLOWED
+
+val principalStr = principal.toString
+
+val host = requestContext.clientAddress().getHostAddress
+val action = new Action(op, new ResourcePattern(resourceType, "NONE", 
PatternType.UNKNOWN), 0, true, true)
+
+val denyLiterals = matchingResources(
+  principalStr, host, op, AclPermissionType.DENY, resourceType, 
PatternType.LITERAL)
+
+if (denyAll(denyLiterals)) {
+  logAuditMessage(requestContext, action, authorized = false)
+  return AuthorizationResult.DENIED
+}
+
+if (shouldAllowEveryoneIfNoAclIsFound) {
+  logAuditMessage(requestContext, action, authorized = true)
+  return AuthorizationResult.ALLOWED
+}
+
+val denyPrefixes = matchingResources(
+  principalStr, host, op, AclPermissionType.DENY, resourceType, 
PatternType.PREFIXED)
+
+if (denyLiterals.isEmpty && denyPrefixes.isEmpty) {
+  if (hasMatchingResources(principalStr, host, op, 
AclPermissionType.ALLOW, resourceType, PatternType.PREFIXED)
+  || hasMatchingResources(principalStr, host, op, 
AclPermissionType.ALLOW, resourceType, PatternType.LITERAL)) {
+logAuditMessage(requestContext, action, authorized = true)
+return AuthorizationResult.ALLOWED
+  } else {
+logAuditMessage(requestContext, action, authorized = false)
+return AuthorizationResult.DENIED
+  }
+}
+
+val allowLiterals = matchingResources(
+  principalStr, host, op, AclPermissionType.ALLOW, resourceType, 
PatternType.LITERAL)
+val allowPrefixes = matchingResources(
+  principalStr, host, op, AclPermissionType.ALLOW, resourceType, 
PatternType.PREFIXED)
+
+if (allowAny(allowLiterals, allowPrefixes, denyLiterals, denyPrefixes)) {
+  logAuditMessage(requestContext, action, authorized = true)
+  return AuthorizationResult.ALLOWED
+}
+
+logAuditMessage(requestContext, action, authorized = false)
+AuthorizationResult.DENIED
+  }
+
+  def matchingResources(principal: String, host: String, op: AclOperation, 
permission: AclPermissionType,
+resourceType: ResourceType, patternType: PatternType): 
List[immutable.HashSet[String]] = {
+var matched = List[immutable.HashSet[String]]()
+for (p <- Set(principal, AclEntry.WildcardPrincipalString)) {
+  for (h <- Set(host, AclEntry.WildcardHost)) {
+for (o <- Set(op, AclOperation.ALL)) {
+  val resourceIndex = new ResourceTypeKey(
+new AccessControlEntry(p, h, o, permission), resourceType, 
patternType)
+  resourceCache.get(resourceIndex) match {
+case Some(resources) => matched = matched :+ resources
+case None =>
+  }
+}
+  }
+}
+matched
+  }
+
+  def hasMatchingResources(principal: String, host: String, op: AclOperation, 
permission: AclPermissionType,

Review comment:
   commit b6a766b

##
File path: core/src/main/scala/kafka/security/authorizer/AclAuthorizer.scala
##
@@ -304,6 +309,137 @@ class AclAuthorizer extends Authorizer with Logging {
 if (zkClient != null) zkClient.close()
   }
 
+  override def authorizeByResourceType(requestContext: 
AuthorizableRequestContext,
+   op: AclOperation,
+   resourceType: ResourceType): 
AuthorizationResult = {
+SecurityUtils.authorizeByResourceTypeCheckArgs(op, resourceType)
+
+val principal = new KafkaPrincipal(
+  requestContext.principal().getPrincipalType,
+  requestContext.principal().getName)
+
+if (isSuperUser(principal))
+  return AuthorizationResult.ALLOWED
+
+val principalStr = principal.toString
+
+val host = requestContext.clientAddress().getHostAddress
+val action = new Action(op, new ResourcePattern(resourceType, "NONE", 
PatternType.UNKNOWN), 0, true, true)
+
+val denyLiterals = matchingResources(
+  principalStr, host, op, AclPermissionType.DENY, resourceType, 
PatternType.LITERAL)
+
+if (denyAll(denyLiterals)) {
+  logAuditMessage(requestContext, action, authorized = false)
+  return 

[GitHub] [kafka] d8tltanc commented on a change in pull request #9485: KAKFA-10619: Idempotent producer will get authorized once it has a WRITE access to at least one topic

2020-12-17 Thread GitBox


d8tltanc commented on a change in pull request #9485:
URL: https://github.com/apache/kafka/pull/9485#discussion_r545529534



##
File path: core/src/main/scala/kafka/security/authorizer/AclAuthorizer.scala
##
@@ -304,6 +309,137 @@ class AclAuthorizer extends Authorizer with Logging {
 if (zkClient != null) zkClient.close()
   }
 
+  override def authorizeByResourceType(requestContext: 
AuthorizableRequestContext,
+   op: AclOperation,
+   resourceType: ResourceType): 
AuthorizationResult = {
+SecurityUtils.authorizeByResourceTypeCheckArgs(op, resourceType)
+
+val principal = new KafkaPrincipal(
+  requestContext.principal().getPrincipalType,
+  requestContext.principal().getName)
+
+if (isSuperUser(principal))
+  return AuthorizationResult.ALLOWED
+
+val principalStr = principal.toString
+
+val host = requestContext.clientAddress().getHostAddress
+val action = new Action(op, new ResourcePattern(resourceType, "NONE", 
PatternType.UNKNOWN), 0, true, true)
+
+val denyLiterals = matchingResources(
+  principalStr, host, op, AclPermissionType.DENY, resourceType, 
PatternType.LITERAL)
+
+if (denyAll(denyLiterals)) {
+  logAuditMessage(requestContext, action, authorized = false)
+  return AuthorizationResult.DENIED
+}
+
+if (shouldAllowEveryoneIfNoAclIsFound) {
+  logAuditMessage(requestContext, action, authorized = true)
+  return AuthorizationResult.ALLOWED
+}
+
+val denyPrefixes = matchingResources(
+  principalStr, host, op, AclPermissionType.DENY, resourceType, 
PatternType.PREFIXED)
+
+if (denyLiterals.isEmpty && denyPrefixes.isEmpty) {
+  if (hasMatchingResources(principalStr, host, op, 
AclPermissionType.ALLOW, resourceType, PatternType.PREFIXED)
+  || hasMatchingResources(principalStr, host, op, 
AclPermissionType.ALLOW, resourceType, PatternType.LITERAL)) {
+logAuditMessage(requestContext, action, authorized = true)
+return AuthorizationResult.ALLOWED
+  } else {
+logAuditMessage(requestContext, action, authorized = false)
+return AuthorizationResult.DENIED
+  }
+}
+
+val allowLiterals = matchingResources(
+  principalStr, host, op, AclPermissionType.ALLOW, resourceType, 
PatternType.LITERAL)
+val allowPrefixes = matchingResources(
+  principalStr, host, op, AclPermissionType.ALLOW, resourceType, 
PatternType.PREFIXED)
+
+if (allowAny(allowLiterals, allowPrefixes, denyLiterals, denyPrefixes)) {
+  logAuditMessage(requestContext, action, authorized = true)
+  return AuthorizationResult.ALLOWED
+}
+
+logAuditMessage(requestContext, action, authorized = false)
+AuthorizationResult.DENIED
+  }
+
+  def matchingResources(principal: String, host: String, op: AclOperation, 
permission: AclPermissionType,
+resourceType: ResourceType, patternType: PatternType): 
List[immutable.HashSet[String]] = {
+var matched = List[immutable.HashSet[String]]()
+for (p <- Set(principal, AclEntry.WildcardPrincipalString)) {
+  for (h <- Set(host, AclEntry.WildcardHost)) {
+for (o <- Set(op, AclOperation.ALL)) {
+  val resourceIndex = new ResourceTypeKey(

Review comment:
   commit b6a766b





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] d8tltanc commented on a change in pull request #9485: KAKFA-10619: Idempotent producer will get authorized once it has a WRITE access to at least one topic

2020-12-17 Thread GitBox


d8tltanc commented on a change in pull request #9485:
URL: https://github.com/apache/kafka/pull/9485#discussion_r545529391



##
File path: core/src/main/scala/kafka/security/authorizer/AclAuthorizer.scala
##
@@ -304,6 +309,137 @@ class AclAuthorizer extends Authorizer with Logging {
 if (zkClient != null) zkClient.close()
   }
 
+  override def authorizeByResourceType(requestContext: 
AuthorizableRequestContext,
+   op: AclOperation,
+   resourceType: ResourceType): 
AuthorizationResult = {
+SecurityUtils.authorizeByResourceTypeCheckArgs(op, resourceType)
+
+val principal = new KafkaPrincipal(
+  requestContext.principal().getPrincipalType,
+  requestContext.principal().getName)
+
+if (isSuperUser(principal))
+  return AuthorizationResult.ALLOWED
+
+val principalStr = principal.toString
+
+val host = requestContext.clientAddress().getHostAddress
+val action = new Action(op, new ResourcePattern(resourceType, "NONE", 
PatternType.UNKNOWN), 0, true, true)
+
+val denyLiterals = matchingResources(
+  principalStr, host, op, AclPermissionType.DENY, resourceType, 
PatternType.LITERAL)
+
+if (denyAll(denyLiterals)) {
+  logAuditMessage(requestContext, action, authorized = false)
+  return AuthorizationResult.DENIED
+}
+
+if (shouldAllowEveryoneIfNoAclIsFound) {
+  logAuditMessage(requestContext, action, authorized = true)
+  return AuthorizationResult.ALLOWED
+}
+
+val denyPrefixes = matchingResources(
+  principalStr, host, op, AclPermissionType.DENY, resourceType, 
PatternType.PREFIXED)
+
+if (denyLiterals.isEmpty && denyPrefixes.isEmpty) {
+  if (hasMatchingResources(principalStr, host, op, 
AclPermissionType.ALLOW, resourceType, PatternType.PREFIXED)
+  || hasMatchingResources(principalStr, host, op, 
AclPermissionType.ALLOW, resourceType, PatternType.LITERAL)) {
+logAuditMessage(requestContext, action, authorized = true)
+return AuthorizationResult.ALLOWED
+  } else {
+logAuditMessage(requestContext, action, authorized = false)
+return AuthorizationResult.DENIED
+  }
+}
+
+val allowLiterals = matchingResources(
+  principalStr, host, op, AclPermissionType.ALLOW, resourceType, 
PatternType.LITERAL)
+val allowPrefixes = matchingResources(
+  principalStr, host, op, AclPermissionType.ALLOW, resourceType, 
PatternType.PREFIXED)
+
+if (allowAny(allowLiterals, allowPrefixes, denyLiterals, denyPrefixes)) {
+  logAuditMessage(requestContext, action, authorized = true)
+  return AuthorizationResult.ALLOWED
+}
+
+logAuditMessage(requestContext, action, authorized = false)
+AuthorizationResult.DENIED
+  }
+
+  def matchingResources(principal: String, host: String, op: AclOperation, 
permission: AclPermissionType,
+resourceType: ResourceType, patternType: PatternType): 
List[immutable.HashSet[String]] = {
+var matched = List[immutable.HashSet[String]]()
+for (p <- Set(principal, AclEntry.WildcardPrincipalString)) {
+  for (h <- Set(host, AclEntry.WildcardHost)) {
+for (o <- Set(op, AclOperation.ALL)) {
+  val resourceIndex = new ResourceTypeKey(
+new AccessControlEntry(p, h, o, permission), resourceType, 
patternType)
+  resourceCache.get(resourceIndex) match {
+case Some(resources) => matched = matched :+ resources
+case None =>
+  }
+}
+  }
+}
+matched
+  }
+
+  def hasMatchingResources(principal: String, host: String, op: AclOperation, 
permission: AclPermissionType,
+   resourceType: ResourceType, patternType: 
PatternType): Boolean = {
+for (p <- Set(principal, AclEntry.WildcardPrincipalString)) {
+  for (h <- Set(host, AclEntry.WildcardHost)) {
+for (o <- Set(op, AclOperation.ALL)) {
+  val resourceIndex = new ResourceTypeKey(
+new AccessControlEntry(p, h, o, permission), resourceType, 
patternType)
+  resourceCache.get(resourceIndex) match {

Review comment:
   resourceCache.contains

##
File path: core/src/main/scala/kafka/security/authorizer/AclAuthorizer.scala
##
@@ -304,6 +309,137 @@ class AclAuthorizer extends Authorizer with Logging {
 if (zkClient != null) zkClient.close()
   }
 
+  override def authorizeByResourceType(requestContext: 
AuthorizableRequestContext,
+   op: AclOperation,
+   resourceType: ResourceType): 
AuthorizationResult = {
+SecurityUtils.authorizeByResourceTypeCheckArgs(op, resourceType)
+
+val principal = new KafkaPrincipal(
+  requestContext.principal().getPrincipalType,
+  requestContext.principal().getName)
+
+if (isSuperUser(principal))
+  return AuthorizationResult.ALLOWED
+
+val principalStr = 

[GitHub] [kafka] d8tltanc commented on a change in pull request #9485: KAKFA-10619: Idempotent producer will get authorized once it has a WRITE access to at least one topic

2020-12-17 Thread GitBox


d8tltanc commented on a change in pull request #9485:
URL: https://github.com/apache/kafka/pull/9485#discussion_r545529131



##
File path: core/src/main/scala/kafka/security/authorizer/AclAuthorizer.scala
##
@@ -304,6 +309,137 @@ class AclAuthorizer extends Authorizer with Logging {
 if (zkClient != null) zkClient.close()
   }
 
+  override def authorizeByResourceType(requestContext: 
AuthorizableRequestContext,
+   op: AclOperation,
+   resourceType: ResourceType): 
AuthorizationResult = {
+SecurityUtils.authorizeByResourceTypeCheckArgs(op, resourceType)
+
+val principal = new KafkaPrincipal(
+  requestContext.principal().getPrincipalType,
+  requestContext.principal().getName)
+
+if (isSuperUser(principal))
+  return AuthorizationResult.ALLOWED
+
+val principalStr = principal.toString
+
+val host = requestContext.clientAddress().getHostAddress
+val action = new Action(op, new ResourcePattern(resourceType, "NONE", 
PatternType.UNKNOWN), 0, true, true)
+
+val denyLiterals = matchingResources(
+  principalStr, host, op, AclPermissionType.DENY, resourceType, 
PatternType.LITERAL)
+
+if (denyAll(denyLiterals)) {
+  logAuditMessage(requestContext, action, authorized = false)
+  return AuthorizationResult.DENIED
+}
+
+if (shouldAllowEveryoneIfNoAclIsFound) {
+  logAuditMessage(requestContext, action, authorized = true)
+  return AuthorizationResult.ALLOWED
+}
+
+val denyPrefixes = matchingResources(
+  principalStr, host, op, AclPermissionType.DENY, resourceType, 
PatternType.PREFIXED)
+
+if (denyLiterals.isEmpty && denyPrefixes.isEmpty) {
+  if (hasMatchingResources(principalStr, host, op, 
AclPermissionType.ALLOW, resourceType, PatternType.PREFIXED)
+  || hasMatchingResources(principalStr, host, op, 
AclPermissionType.ALLOW, resourceType, PatternType.LITERAL)) {
+logAuditMessage(requestContext, action, authorized = true)
+return AuthorizationResult.ALLOWED
+  } else {
+logAuditMessage(requestContext, action, authorized = false)
+return AuthorizationResult.DENIED
+  }
+}
+
+val allowLiterals = matchingResources(
+  principalStr, host, op, AclPermissionType.ALLOW, resourceType, 
PatternType.LITERAL)
+val allowPrefixes = matchingResources(
+  principalStr, host, op, AclPermissionType.ALLOW, resourceType, 
PatternType.PREFIXED)
+
+if (allowAny(allowLiterals, allowPrefixes, denyLiterals, denyPrefixes)) {
+  logAuditMessage(requestContext, action, authorized = true)
+  return AuthorizationResult.ALLOWED
+}
+
+logAuditMessage(requestContext, action, authorized = false)
+AuthorizationResult.DENIED
+  }
+
+  def matchingResources(principal: String, host: String, op: AclOperation, 
permission: AclPermissionType,
+resourceType: ResourceType, patternType: PatternType): 
List[immutable.HashSet[String]] = {
+var matched = List[immutable.HashSet[String]]()
+for (p <- Set(principal, AclEntry.WildcardPrincipalString)) {
+  for (h <- Set(host, AclEntry.WildcardHost)) {
+for (o <- Set(op, AclOperation.ALL)) {
+  val resourceIndex = new ResourceTypeKey(
+new AccessControlEntry(p, h, o, permission), resourceType, 
patternType)
+  resourceCache.get(resourceIndex) match {
+case Some(resources) => matched = matched :+ resources
+case None =>
+  }
+}
+  }
+}
+matched
+  }
+
+  def hasMatchingResources(principal: String, host: String, op: AclOperation, 
permission: AclPermissionType,
+   resourceType: ResourceType, patternType: 
PatternType): Boolean = {
+for (p <- Set(principal, AclEntry.WildcardPrincipalString)) {
+  for (h <- Set(host, AclEntry.WildcardHost)) {
+for (o <- Set(op, AclOperation.ALL)) {
+  val resourceIndex = new ResourceTypeKey(

Review comment:
   commit b6a766b228034a442e3a6e8b71ecee78eefdbfd3





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] d8tltanc commented on a change in pull request #9485: KAKFA-10619: Idempotent producer will get authorized once it has a WRITE access to at least one topic

2020-12-17 Thread GitBox


d8tltanc commented on a change in pull request #9485:
URL: https://github.com/apache/kafka/pull/9485#discussion_r545527187



##
File path: 
core/src/test/scala/unit/kafka/security/authorizer/AuthorizerInterfaceDefaultTest.scala
##
@@ -0,0 +1,99 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package kafka.security.authorizer
+
+import java.util.concurrent.CompletionStage
+import java.{lang, util}
+
+import kafka.server.KafkaConfig
+import kafka.utils.TestUtils
+import kafka.zk.ZooKeeperTestHarness
+import kafka.zookeeper.ZooKeeperClient
+import org.apache.kafka.common.Endpoint
+import org.apache.kafka.common.acl._
+import org.apache.kafka.common.utils.Time
+import org.apache.kafka.server.authorizer._
+import org.junit.{After, Before}
+
+class AuthorizerInterfaceDefaultTest extends ZooKeeperTestHarness with 
BaseAuthorizerTest {
+
+  private val interfaceDefaultAuthorizer = new DelegateAuthorizer
+
+  override def authorizer: Authorizer = interfaceDefaultAuthorizer
+
+  @Before
+  override def setUp(): Unit = {
+super.setUp()
+
+val authorizers = Seq(interfaceDefaultAuthorizer.authorizer)
+
+// Increase maxUpdateRetries to avoid transient failures
+authorizers.foreach(a => a.maxUpdateRetries = Int.MaxValue)
+
+val props = TestUtils.createBrokerConfig(0, zkConnect)
+props.put(AclAuthorizer.SuperUsersProp, superUsers)
+
+config = KafkaConfig.fromProps(props)
+authorizers.foreach(a => a.configure(config.originals))
+
+zooKeeperClient = new ZooKeeperClient(zkConnect, zkSessionTimeout, 
zkConnectionTimeout, zkMaxInFlightRequests,

Review comment:
   I think that the ZookeeperClient has a different metric group name. I'm 
not sure how the name will be used though. And yes, we can do that in a 
follow-up PR later.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] d8tltanc commented on a change in pull request #9485: KAKFA-10619: Idempotent producer will get authorized once it has a WRITE access to at least one topic

2020-12-17 Thread GitBox


d8tltanc commented on a change in pull request #9485:
URL: https://github.com/apache/kafka/pull/9485#discussion_r545525163



##
File path: core/src/main/scala/kafka/security/authorizer/AclAuthorizer.scala
##
@@ -304,6 +309,137 @@ class AclAuthorizer extends Authorizer with Logging {
 if (zkClient != null) zkClient.close()
   }
 
+  override def authorizeByResourceType(requestContext: 
AuthorizableRequestContext,
+   op: AclOperation,
+   resourceType: ResourceType): 
AuthorizationResult = {
+SecurityUtils.authorizeByResourceTypeCheckArgs(op, resourceType)
+
+val principal = new KafkaPrincipal(
+  requestContext.principal().getPrincipalType,
+  requestContext.principal().getName)
+
+if (isSuperUser(principal))
+  return AuthorizationResult.ALLOWED
+
+val principalStr = principal.toString
+
+val host = requestContext.clientAddress().getHostAddress
+val action = new Action(op, new ResourcePattern(resourceType, "NONE", 
PatternType.UNKNOWN), 0, true, true)
+
+val denyLiterals = matchingResources(
+  principalStr, host, op, AclPermissionType.DENY, resourceType, 
PatternType.LITERAL)
+
+if (denyAll(denyLiterals)) {
+  logAuditMessage(requestContext, action, authorized = false)
+  return AuthorizationResult.DENIED
+}
+
+if (shouldAllowEveryoneIfNoAclIsFound) {
+  logAuditMessage(requestContext, action, authorized = true)
+  return AuthorizationResult.ALLOWED
+}
+
+val denyPrefixes = matchingResources(
+  principalStr, host, op, AclPermissionType.DENY, resourceType, 
PatternType.PREFIXED)
+
+if (denyLiterals.isEmpty && denyPrefixes.isEmpty) {
+  if (hasMatchingResources(principalStr, host, op, 
AclPermissionType.ALLOW, resourceType, PatternType.PREFIXED)
+  || hasMatchingResources(principalStr, host, op, 
AclPermissionType.ALLOW, resourceType, PatternType.LITERAL)) {
+logAuditMessage(requestContext, action, authorized = true)
+return AuthorizationResult.ALLOWED
+  } else {
+logAuditMessage(requestContext, action, authorized = false)
+return AuthorizationResult.DENIED
+  }
+}
+
+val allowLiterals = matchingResources(
+  principalStr, host, op, AclPermissionType.ALLOW, resourceType, 
PatternType.LITERAL)
+val allowPrefixes = matchingResources(
+  principalStr, host, op, AclPermissionType.ALLOW, resourceType, 
PatternType.PREFIXED)
+
+if (allowAny(allowLiterals, allowPrefixes, denyLiterals, denyPrefixes)) {
+  logAuditMessage(requestContext, action, authorized = true)
+  return AuthorizationResult.ALLOWED
+}
+
+logAuditMessage(requestContext, action, authorized = false)
+AuthorizationResult.DENIED
+  }
+
+  def matchingResources(principal: String, host: String, op: AclOperation, 
permission: AclPermissionType,
+resourceType: ResourceType, patternType: PatternType): 
List[immutable.HashSet[String]] = {
+var matched = List[immutable.HashSet[String]]()
+for (p <- Set(principal, AclEntry.WildcardPrincipalString)) {
+  for (h <- Set(host, AclEntry.WildcardHost)) {
+for (o <- Set(op, AclOperation.ALL)) {
+  val resourceIndex = new ResourceTypeKey(
+new AccessControlEntry(p, h, o, permission), resourceType, 
patternType)
+  resourceCache.get(resourceIndex) match {

Review comment:
   Right. To prevent the phantom problems.

##
File path: core/src/main/scala/kafka/security/authorizer/AclAuthorizer.scala
##
@@ -304,6 +309,137 @@ class AclAuthorizer extends Authorizer with Logging {
 if (zkClient != null) zkClient.close()
   }
 
+  override def authorizeByResourceType(requestContext: 
AuthorizableRequestContext,
+   op: AclOperation,
+   resourceType: ResourceType): 
AuthorizationResult = {
+SecurityUtils.authorizeByResourceTypeCheckArgs(op, resourceType)
+
+val principal = new KafkaPrincipal(
+  requestContext.principal().getPrincipalType,
+  requestContext.principal().getName)
+
+if (isSuperUser(principal))
+  return AuthorizationResult.ALLOWED
+
+val principalStr = principal.toString
+
+val host = requestContext.clientAddress().getHostAddress
+val action = new Action(op, new ResourcePattern(resourceType, "NONE", 
PatternType.UNKNOWN), 0, true, true)
+
+val denyLiterals = matchingResources(
+  principalStr, host, op, AclPermissionType.DENY, resourceType, 
PatternType.LITERAL)
+
+if (denyAll(denyLiterals)) {
+  logAuditMessage(requestContext, action, authorized = false)
+  return AuthorizationResult.DENIED
+}
+
+if (shouldAllowEveryoneIfNoAclIsFound) {
+  logAuditMessage(requestContext, action, authorized = true)
+  return AuthorizationResult.ALLOWED
+}
+
+val denyPrefixes = matchingResources(
+  

[GitHub] [kafka] d8tltanc commented on a change in pull request #9485: KAKFA-10619: Idempotent producer will get authorized once it has a WRITE access to at least one topic

2020-12-17 Thread GitBox


d8tltanc commented on a change in pull request #9485:
URL: https://github.com/apache/kafka/pull/9485#discussion_r545471749



##
File path: core/src/main/scala/kafka/security/authorizer/AclAuthorizer.scala
##
@@ -304,6 +309,137 @@ class AclAuthorizer extends Authorizer with Logging {
 if (zkClient != null) zkClient.close()
   }
 
+  override def authorizeByResourceType(requestContext: 
AuthorizableRequestContext,
+   op: AclOperation,
+   resourceType: ResourceType): 
AuthorizationResult = {
+SecurityUtils.authorizeByResourceTypeCheckArgs(op, resourceType)
+
+val principal = new KafkaPrincipal(
+  requestContext.principal().getPrincipalType,
+  requestContext.principal().getName)
+
+if (isSuperUser(principal))
+  return AuthorizationResult.ALLOWED
+
+val principalStr = principal.toString
+
+val host = requestContext.clientAddress().getHostAddress
+val action = new Action(op, new ResourcePattern(resourceType, "NONE", 
PatternType.UNKNOWN), 0, true, true)

Review comment:
   ```
   public ResourcePattern(ResourceType resourceType, String name, 
PatternType patternType) {
   this.resourceType = Objects.requireNonNull(resourceType, 
"resourceType");
   this.name = Objects.requireNonNull(name, "name");
   this.patternType = Objects.requireNonNull(patternType, 
"patternType");
   
   if (resourceType == ResourceType.ANY) {
   throw new IllegalArgumentException("resourceType must not be 
ANY");
   }
   
   if (patternType == PatternType.MATCH || patternType == 
PatternType.ANY) {
   throw new IllegalArgumentException("patternType must not be " + 
patternType);
   }
   }
   ```
   
   I think the ResourcePattern constructor is preventing us passing 
PatternType.ANY. It's only usable with Filter.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] d8tltanc commented on a change in pull request #9485: KAKFA-10619: Idempotent producer will get authorized once it has a WRITE access to at least one topic

2020-12-16 Thread GitBox


d8tltanc commented on a change in pull request #9485:
URL: https://github.com/apache/kafka/pull/9485#discussion_r544713670



##
File path: 
core/src/test/scala/unit/kafka/security/authorizer/AuthorizerWrapperTest.scala
##
@@ -0,0 +1,211 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package kafka.security.authorizer
+
+import java.net.InetAddress
+import java.util.UUID
+
+import kafka.security.auth.SimpleAclAuthorizer
+import kafka.server.KafkaConfig
+import kafka.utils.TestUtils
+import kafka.zk.ZooKeeperTestHarness
+import kafka.zookeeper.ZooKeeperClient
+import org.apache.kafka.common.acl.AclOperation._
+import org.apache.kafka.common.acl._
+import org.apache.kafka.common.network.{ClientInformation, ListenerName}
+import org.apache.kafka.common.protocol.ApiKeys
+import org.apache.kafka.common.requests.{RequestContext, RequestHeader}
+import org.apache.kafka.common.resource.PatternType.LITERAL
+import org.apache.kafka.common.resource.ResourceType._
+import org.apache.kafka.common.resource.{ResourcePattern, ResourceType}
+import org.apache.kafka.common.security.auth.{KafkaPrincipal, SecurityProtocol}
+import org.apache.kafka.common.utils.Time
+import org.apache.kafka.server.authorizer._
+import org.junit.Assert._
+import org.junit.{After, Before, Test}
+
+import scala.annotation.nowarn
+import scala.collection.mutable.ArrayBuffer
+import scala.jdk.CollectionConverters._
+
+class AuthorizerWrapperTest extends ZooKeeperTestHarness {
+  @nowarn("cat=deprecation")
+  private val wrappedSimpleAuthorizer = new AuthorizerWrapper(new 
SimpleAclAuthorizer)
+  @nowarn("cat=deprecation")
+  private val wrappedSimpleAuthorizerAllowEveryone = new AuthorizerWrapper(new 
SimpleAclAuthorizer)
+  private var resource: ResourcePattern = _
+  private val superUsers = "User:superuser1; User:superuser2"
+  private val username = "alice"
+  private val principal = new KafkaPrincipal(KafkaPrincipal.USER_TYPE, 
username)
+  private val requestContext = newRequestContext(principal, 
InetAddress.getByName("192.168.0.1"))
+  private var config: KafkaConfig = _
+  private var zooKeeperClient: ZooKeeperClient = _
+
+  private val aclAdded: ArrayBuffer[(Authorizer, Set[AccessControlEntry], 
ResourcePattern)] = ArrayBuffer()
+  private val authorizerTestFactory = new AuthorizerTestFactory(
+newRequestContext, addAcls, authorizeByResourceType, removeAcls)
+
+  class CustomPrincipal(principalType: String, name: String) extends 
KafkaPrincipal(principalType, name) {
+override def equals(o: scala.Any): Boolean = false
+  }
+
+  @Before
+  @nowarn("cat=deprecation")
+  override def setUp(): Unit = {
+super.setUp()
+
+val props = TestUtils.createBrokerConfig(0, zkConnect)
+
+props.put(AclAuthorizer.SuperUsersProp, superUsers)
+config = KafkaConfig.fromProps(props)
+wrappedSimpleAuthorizer.configure(config.originals)
+
+props.put(SimpleAclAuthorizer.AllowEveryoneIfNoAclIsFoundProp, "true")
+config = KafkaConfig.fromProps(props)
+wrappedSimpleAuthorizerAllowEveryone.configure(config.originals)
+
+resource = new ResourcePattern(TOPIC, "foo-" + UUID.randomUUID(), LITERAL)
+zooKeeperClient = new ZooKeeperClient(zkConnect, zkSessionTimeout, 
zkConnectionTimeout, zkMaxInFlightRequests,
+  Time.SYSTEM, "kafka.test", "AuthorizerWrapperTest")
+  }
+
+  @After
+  override def tearDown(): Unit = {
+val authorizers = Seq(wrappedSimpleAuthorizer, 
wrappedSimpleAuthorizerAllowEveryone)
+authorizers.foreach(a => {
+  a.close()
+})
+zooKeeperClient.close()
+super.tearDown()
+  }
+
+  @Test
+  def testAuthorizeByResourceTypeMultipleAddAndRemove(): Unit = {
+
authorizerTestFactory.testAuthorizeByResourceTypeMultipleAddAndRemove(wrappedSimpleAuthorizer)
+  }

Review comment:
   commit 092fec70a9547ec07cba999e77be1c0cf79fa275
   commit e5e3d18f57ab22df20133f9841905af384d9b641
   
   These two commits are condensing the class methods and members into the 
BaseAuthorizerTest. 
   
   In BaseAuthorizerTest, the only abstract method is an authorizer provider. 
After overriding the provider, those test cases in it are sufficient to run.
   
   Now the test code looks much cleaner. If the changes look too 

[GitHub] [kafka] d8tltanc commented on a change in pull request #9485: KAKFA-10619: Idempotent producer will get authorized once it has a WRITE access to at least one topic

2020-12-16 Thread GitBox


d8tltanc commented on a change in pull request #9485:
URL: https://github.com/apache/kafka/pull/9485#discussion_r544547081



##
File path: 
jmh-benchmarks/src/main/java/org/apache/kafka/jmh/acl/AclAuthorizerBenchmark.java
##
@@ -116,38 +122,88 @@ private void setFieldValue(Object obj, String fieldName, 
Object value) throws Ex
 Set entries = aclEntries.computeIfAbsent(resource, k -> 
new HashSet<>());
 
 for (int aclId = 0; aclId < aclCount; aclId++) {
-AccessControlEntry ace = new 
AccessControlEntry(principal.toString() + aclId,
-"*", AclOperation.READ, AclPermissionType.ALLOW);
-entries.add(new AclEntry(ace));
+// The principle in the request context we are using

Review comment:
   commit ec80dc4e55758d83835f3ecde381a988d6dd4779





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] d8tltanc commented on a change in pull request #9485: KAKFA-10619: Idempotent producer will get authorized once it has a WRITE access to at least one topic

2020-12-16 Thread GitBox


d8tltanc commented on a change in pull request #9485:
URL: https://github.com/apache/kafka/pull/9485#discussion_r544546946



##
File path: 
core/src/test/scala/unit/kafka/security/authorizer/AuthorizerTestFactory.scala
##
@@ -0,0 +1,321 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.security.authorizer
+
+import java.net.InetAddress
+import java.util.UUID
+
+import kafka.security.authorizer.AclEntry.{WildcardHost, 
WildcardPrincipalString}
+import org.apache.kafka.common.acl.AclOperation.{ALL, READ, WRITE}
+import org.apache.kafka.common.acl.AclPermissionType.{ALLOW, DENY}
+import org.apache.kafka.common.acl.{AccessControlEntry, AclOperation}
+import org.apache.kafka.common.protocol.ApiKeys
+import org.apache.kafka.common.requests.RequestContext
+import org.apache.kafka.common.resource.PatternType.{LITERAL, PREFIXED}
+import org.apache.kafka.common.resource.ResourcePattern.WILDCARD_RESOURCE
+import org.apache.kafka.common.resource.ResourceType.{CLUSTER, GROUP, TOPIC, 
TRANSACTIONAL_ID}
+import org.apache.kafka.common.resource.{ResourcePattern, ResourceType}
+import org.apache.kafka.common.security.auth.KafkaPrincipal
+import org.apache.kafka.server.authorizer.Authorizer
+import org.junit.Assert.{assertFalse, assertTrue}
+
+class AuthorizerTestFactory(val newRequestContext3: (KafkaPrincipal, 
InetAddress, ApiKeys) => RequestContext,
+val addAcls: (Authorizer, Set[AccessControlEntry], 
ResourcePattern) => Unit,
+val authorizeByResourceType: (Authorizer, 
RequestContext, AclOperation, ResourceType) => Boolean,
+val removeAcls: (Authorizer, 
Set[AccessControlEntry], ResourcePattern) => Unit) {
+  def newRequestContext(kafkaPrincipal: KafkaPrincipal, inetAddress: 
InetAddress): RequestContext =
+newRequestContext3(kafkaPrincipal, inetAddress, ApiKeys.PRODUCE)
+
+  def testAuthorizeByResourceTypeMultipleAddAndRemove(authorizer: Authorizer): 
Unit = {
+val user1 = new KafkaPrincipal(KafkaPrincipal.USER_TYPE, "user1")
+val host1 = InetAddress.getByName("192.168.1.1")
+val resource1 = new ResourcePattern(TOPIC, "sb1" + UUID.randomUUID(), 
LITERAL)
+val denyRead = new AccessControlEntry(user1.toString, 
host1.getHostAddress, READ, DENY)
+val allowRead = new AccessControlEntry(user1.toString, 
host1.getHostAddress, READ, ALLOW)
+val u1h1Context = newRequestContext(user1, host1)
+
+for (_ <- 1 to 10) {
+  assertFalse("User1 from host1 should not have READ access to any topic 
when no ACL exists",
+authorizeByResourceType(authorizer, u1h1Context, READ, 
ResourceType.TOPIC))
+
+  addAcls(authorizer, Set(allowRead), resource1)
+  assertTrue("User1 from host1 now should have READ access to at least one 
topic",
+authorizeByResourceType(authorizer, u1h1Context, READ, 
ResourceType.TOPIC))
+
+  for (_ <- 1 to 10) {
+addAcls(authorizer, Set(denyRead), resource1)
+assertFalse("User1 from host1 now should not have READ access to any 
topic",
+  authorizeByResourceType(authorizer, u1h1Context, READ, 
ResourceType.TOPIC))
+
+removeAcls(authorizer, Set(denyRead), resource1)
+addAcls(authorizer, Set(allowRead), resource1)
+assertTrue("User1 from host1 now should have READ access to at least 
one topic",
+  authorizeByResourceType(authorizer, u1h1Context, READ, 
ResourceType.TOPIC))
+  }
+
+  removeAcls(authorizer, Set(allowRead), resource1)
+  assertFalse("User1 from host1 now should not have READ access to any 
topic",
+authorizeByResourceType(authorizer, u1h1Context, READ, 
ResourceType.TOPIC))
+}
+  }
+
+   def 
testAuthorizeByResourceTypeIsolationUnrelatedDenyWontDominateAllow(authorizer: 
Authorizer): Unit = {
+val user1 = new KafkaPrincipal(KafkaPrincipal.USER_TYPE, "user1")
+val user2 = new KafkaPrincipal(KafkaPrincipal.USER_TYPE, "user2")
+val host1 = InetAddress.getByName("192.168.1.1")
+val host2 = InetAddress.getByName("192.168.1.2")
+val resource1 = new ResourcePattern(TOPIC, "sb1" + UUID.randomUUID(), 
LITERAL)
+val resource2 = new ResourcePattern(TOPIC, "sb2" + 

[GitHub] [kafka] d8tltanc commented on a change in pull request #9485: KAKFA-10619: Idempotent producer will get authorized once it has a WRITE access to at least one topic

2020-12-16 Thread GitBox


d8tltanc commented on a change in pull request #9485:
URL: https://github.com/apache/kafka/pull/9485#discussion_r544545938



##
File path: 
clients/src/main/java/org/apache/kafka/server/authorizer/Authorizer.java
##
@@ -139,4 +152,134 @@
  * @return Iterator for ACL bindings, which may be populated lazily.
  */
 Iterable acls(AclBindingFilter filter);
+
+/**
+ * Check if the caller is authorized to perform the given ACL operation on 
at least one
+ * resource of the given type.
+ *
+ * It is important to override this interface default in implementations 
because
+ * 1. The interface default iterates all AclBindings multiple times, 
without any indexing,
+ *which is a CPU intense work.
+ * 2. The interface default rebuild several sets of strings, which is a 
memory intense work.
+ * 3. The interface default cannot perform the audit logging properly
+ *
+ * @param requestContext Request context including request resourceType, 
security protocol, and listener name
+ * @param op The ACL operation to check
+ * @param resourceType   The resource type to check
+ * @return   Return {@link AuthorizationResult#ALLOWED} if the 
caller is authorized to perform the
+ *   given ACL operation on at least one resource of 
the given type.
+ *   Return {@link AuthorizationResult#DENIED} 
otherwise.
+ */
+default AuthorizationResult 
authorizeByResourceType(AuthorizableRequestContext requestContext, AclOperation 
op, ResourceType resourceType) {
+SecurityUtils.authorizeByResourceTypeCheckArgs(op, resourceType);
+
+if (authorize(requestContext, Collections.singletonList(new Action(
+AclOperation.READ,

Review comment:
   commit ec80dc4e55758d83835f3ecde381a988d6dd4779

##
File path: 
clients/src/main/java/org/apache/kafka/server/authorizer/Authorizer.java
##
@@ -139,4 +152,134 @@
  * @return Iterator for ACL bindings, which may be populated lazily.
  */
 Iterable acls(AclBindingFilter filter);
+
+/**
+ * Check if the caller is authorized to perform the given ACL operation on 
at least one
+ * resource of the given type.
+ *
+ * It is important to override this interface default in implementations 
because
+ * 1. The interface default iterates all AclBindings multiple times, 
without any indexing,
+ *which is a CPU intense work.
+ * 2. The interface default rebuild several sets of strings, which is a 
memory intense work.
+ * 3. The interface default cannot perform the audit logging properly
+ *
+ * @param requestContext Request context including request resourceType, 
security protocol, and listener name
+ * @param op The ACL operation to check
+ * @param resourceType   The resource type to check
+ * @return   Return {@link AuthorizationResult#ALLOWED} if the 
caller is authorized to perform the
+ *   given ACL operation on at least one resource of 
the given type.
+ *   Return {@link AuthorizationResult#DENIED} 
otherwise.
+ */
+default AuthorizationResult 
authorizeByResourceType(AuthorizableRequestContext requestContext, AclOperation 
op, ResourceType resourceType) {
+SecurityUtils.authorizeByResourceTypeCheckArgs(op, resourceType);
+
+if (authorize(requestContext, Collections.singletonList(new Action(
+AclOperation.READ,
+new ResourcePattern(resourceType, "hardcode", 
PatternType.LITERAL),
+0, false, false)))

Review comment:
   commit ec80dc4e55758d83835f3ecde381a988d6dd4779

##
File path: core/src/main/scala/kafka/security/authorizer/AclAuthorizer.scala
##
@@ -304,6 +310,137 @@ class AclAuthorizer extends Authorizer with Logging {
 if (zkClient != null) zkClient.close()
   }
 
+  override def authorizeByResourceType(requestContext: 
AuthorizableRequestContext,
+   op: AclOperation,
+   resourceType: ResourceType): 
AuthorizationResult = {
+SecurityUtils.authorizeByResourceTypeCheckArgs(op, resourceType)
+
+val principal = new KafkaPrincipal(
+  requestContext.principal().getPrincipalType,
+  requestContext.principal().getName)
+
+if (isSuperUser(principal))
+  return AuthorizationResult.ALLOWED
+
+val principalStr = principal.toString
+
+val host = requestContext.clientAddress().getHostAddress
+val action = new Action(op, new ResourcePattern(resourceType, "NONE", 
PatternType.UNKNOWN), 0, true, true)
+
+val denyLiterals = matchingResources(
+  principalStr, host, op, AclPermissionType.DENY, resourceType, 
PatternType.LITERAL)
+
+if (denyAll(denyLiterals)) {
+  logAuditMessage(requestContext, action, false)

Review comment:
   commit 

[GitHub] [kafka] d8tltanc commented on a change in pull request #9485: KAKFA-10619: Idempotent producer will get authorized once it has a WRITE access to at least one topic

2020-12-15 Thread GitBox


d8tltanc commented on a change in pull request #9485:
URL: https://github.com/apache/kafka/pull/9485#discussion_r543114091



##
File path: 
core/src/test/scala/unit/kafka/security/authorizer/AclAuthorizerTest.scala
##
@@ -1040,19 +1116,24 @@ class AclAuthorizerTest extends ZooKeeperTestHarness {
   securityProtocol, ClientInformation.EMPTY, false)
   }
 
-  private def authorize(authorizer: AclAuthorizer, requestContext: 
RequestContext, operation: AclOperation, resource: ResourcePattern): Boolean = {
+  private def authorize(authorizer: Authorizer, requestContext: 
RequestContext, operation: AclOperation, resource: ResourcePattern): Boolean = {
 val action = new Action(operation, resource, 1, true, true)
 authorizer.authorize(requestContext, List(action).asJava).asScala.head == 
AuthorizationResult.ALLOWED
   }
 
-  private def addAcls(authorizer: AclAuthorizer, aces: 
Set[AccessControlEntry], resourcePattern: ResourcePattern): Unit = {
+  private def authorizeByResourceType(authorizer: Authorizer, requestContext: 
RequestContext, operation: AclOperation, resourceType: ResourceType) : Boolean 
= {
+authorizer.authorizeByResourceType(requestContext, operation, 
resourceType) == AuthorizationResult.ALLOWED
+  }
+
+  private def addAcls(authorizer: Authorizer, aces: Set[AccessControlEntry], 
resourcePattern: ResourcePattern): Unit = {

Review comment:
   I replied here. Maybe I shouldn't have resolved it. 
https://github.com/apache/kafka/pull/9485#discussion_r540706117
   
   Since AuthorizerInterfaceDefaultTest, AclAuthorizerTest, and 
AuthorizerWrapperTest are sharing some test cases, we need to make this method 
signature abstract a bit, in order to pass the method reference to 
AuthorizerTestFactory.
   





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] d8tltanc commented on a change in pull request #9485: KAKFA-10619: Idempotent producer will get authorized once it has a WRITE access to at least one topic

2020-12-15 Thread GitBox


d8tltanc commented on a change in pull request #9485:
URL: https://github.com/apache/kafka/pull/9485#discussion_r543114091



##
File path: 
core/src/test/scala/unit/kafka/security/authorizer/AclAuthorizerTest.scala
##
@@ -1040,19 +1116,24 @@ class AclAuthorizerTest extends ZooKeeperTestHarness {
   securityProtocol, ClientInformation.EMPTY, false)
   }
 
-  private def authorize(authorizer: AclAuthorizer, requestContext: 
RequestContext, operation: AclOperation, resource: ResourcePattern): Boolean = {
+  private def authorize(authorizer: Authorizer, requestContext: 
RequestContext, operation: AclOperation, resource: ResourcePattern): Boolean = {
 val action = new Action(operation, resource, 1, true, true)
 authorizer.authorize(requestContext, List(action).asJava).asScala.head == 
AuthorizationResult.ALLOWED
   }
 
-  private def addAcls(authorizer: AclAuthorizer, aces: 
Set[AccessControlEntry], resourcePattern: ResourcePattern): Unit = {
+  private def authorizeByResourceType(authorizer: Authorizer, requestContext: 
RequestContext, operation: AclOperation, resourceType: ResourceType) : Boolean 
= {
+authorizer.authorizeByResourceType(requestContext, operation, 
resourceType) == AuthorizationResult.ALLOWED
+  }
+
+  private def addAcls(authorizer: Authorizer, aces: Set[AccessControlEntry], 
resourcePattern: ResourcePattern): Unit = {

Review comment:
   I replied here. Maybe I shouldn't have resolved it. 
https://github.com/apache/kafka/pull/9485#discussion_r540706117
   
   Since AuthorizerInterfaceDefaultTest, AclAuthorizerTest, and 
AuthorizerWrapperTest are sharing some test cases, we need to make this method 
signature abstract a bit, in order to make it usable by AuthorizerTestFactory.
   





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] d8tltanc commented on a change in pull request #9485: KAKFA-10619: Idempotent producer will get authorized once it has a WRITE access to at least one topic

2020-12-15 Thread GitBox


d8tltanc commented on a change in pull request #9485:
URL: https://github.com/apache/kafka/pull/9485#discussion_r543817949



##
File path: core/src/main/scala/kafka/security/authorizer/AclAuthorizer.scala
##
@@ -130,6 +130,10 @@ class AclAuthorizer extends Authorizer with Logging {
 
   @volatile
   private var aclCache = new 
scala.collection.immutable.TreeMap[ResourcePattern, VersionedAcls]()(new 
ResourceOrdering)
+
+  private val resourceCache = new 
scala.collection.mutable.HashMap[ResourceIndex,
+scala.collection.mutable.HashSet[String]]()

Review comment:
   commit 62c44ade550a90671ff41bfb847e2bc28adc7baa





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] d8tltanc commented on a change in pull request #9485: KAKFA-10619: Idempotent producer will get authorized once it has a WRITE access to at least one topic

2020-12-15 Thread GitBox


d8tltanc commented on a change in pull request #9485:
URL: https://github.com/apache/kafka/pull/9485#discussion_r543814681



##
File path: 
clients/src/main/java/org/apache/kafka/server/authorizer/Authorizer.java
##
@@ -139,4 +151,126 @@
  * @return Iterator for ACL bindings, which may be populated lazily.
  */
 Iterable acls(AclBindingFilter filter);
+
+/**
+ * Check if the caller is authorized to perform the given ACL operation on 
at least one
+ * resource of the given type.
+ *
+ * 1. Filter out all the resource pattern corresponding to the 
requestContext, AclOperation,
+ *and ResourceType
+ * 2. If wildcard deny exists, return deny directly
+ * 3. For any literal allowed resource, if there's no dominant literal 
denied resource, and
+ *no dominant prefixed denied resource, return allow
+ * 4. For any prefixed allowed resource, if there's no dominant denied 
resource, return allow
+ * 5. For any other cases, return deny
+ *
+ * It is important to override this interface default in implementations 
because
+ * 1. The interface default iterates all AclBindings multiple times, 
without any indexing,
+ *which is a CPU intense work.
+ * 2. The interface default rebuild several sets of strings, which is a 
memory intense work.
+ *
+ * @param requestContext Request context including request resourceType, 
security protocol, and listener name
+ * @param op The ACL operation to check
+ * @param resourceType   The resource type to check
+ * @return   Return {@link AuthorizationResult#ALLOWED} if the 
caller is authorized to perform the
+ *   given ACL operation on at least one resource of 
the given type.
+ *   Return {@link AuthorizationResult#DENIED} 
otherwise.
+ */
+default AuthorizationResult 
authorizeByResourceType(AuthorizableRequestContext requestContext, AclOperation 
op, ResourceType resourceType) {
+SecurityUtils.authorizeByResourceTypeCheckArgs(op, resourceType);

Review comment:
   Good catch. This is super important.
   
   commit dae1a78





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] d8tltanc commented on a change in pull request #9485: KAKFA-10619: Idempotent producer will get authorized once it has a WRITE access to at least one topic

2020-12-15 Thread GitBox


d8tltanc commented on a change in pull request #9485:
URL: https://github.com/apache/kafka/pull/9485#discussion_r543814563



##
File path: clients/src/main/java/org/apache/kafka/common/acl/ResourceIndex.java
##
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.common.acl;

Review comment:
   Make ResourceTypeKey an inner class of AclAuthorizer 
   
   commit 7fe92c6436432760adf9465c3f0bcf3c91104b10





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] d8tltanc commented on a change in pull request #9485: KAKFA-10619: Idempotent producer will get authorized once it has a WRITE access to at least one topic

2020-12-15 Thread GitBox


d8tltanc commented on a change in pull request #9485:
URL: https://github.com/apache/kafka/pull/9485#discussion_r543814295



##
File path: clients/src/main/java/org/apache/kafka/common/acl/ResourceIndex.java
##
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.common.acl;
+
+import org.apache.kafka.common.resource.PatternType;
+import org.apache.kafka.common.resource.ResourceType;
+
+import java.util.Objects;
+
+public class ResourceIndex {

Review comment:
   ResourceTypeKey sounds good:
   
   commit 7fe92c6436432760adf9465c3f0bcf3c91104b10





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] d8tltanc commented on a change in pull request #9485: KAKFA-10619: Idempotent producer will get authorized once it has a WRITE access to at least one topic

2020-12-15 Thread GitBox


d8tltanc commented on a change in pull request #9485:
URL: https://github.com/apache/kafka/pull/9485#discussion_r543813486



##
File path: 
core/src/test/scala/unit/kafka/security/authorizer/AclAuthorizerTest.scala
##
@@ -100,8 +106,15 @@ class AclAuthorizerTest extends ZooKeeperTestHarness {
 
   @After
   override def tearDown(): Unit = {
-aclAuthorizer.close()
-aclAuthorizer2.close()
+val authorizers = Seq(aclAuthorizer, aclAuthorizer2)
+authorizers.foreach(a => {
+  a.acls(AclBindingFilter.ANY).forEach(bd => {
+removeAcls(aclAuthorizer, Set(bd.entry), bd.pattern())

Review comment:
   Removed.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] d8tltanc commented on a change in pull request #9485: KAKFA-10619: Idempotent producer will get authorized once it has a WRITE access to at least one topic

2020-12-15 Thread GitBox


d8tltanc commented on a change in pull request #9485:
URL: https://github.com/apache/kafka/pull/9485#discussion_r543805858



##
File path: core/src/main/scala/kafka/security/authorizer/AclAuthorizer.scala
##
@@ -130,6 +130,10 @@ class AclAuthorizer extends Authorizer with Logging {
 
   @volatile
   private var aclCache = new 
scala.collection.immutable.TreeMap[ResourcePattern, VersionedAcls]()(new 
ResourceOrdering)
+
+  private val resourceCache = new 
scala.collection.mutable.HashMap[ResourceIndex,
+scala.collection.mutable.HashSet[String]]()

Review comment:
   Use Immutable collections:
   
   Benchmark   (aclCount)  
(denyPercentage)  (resourceCount)  Mode  Cnt Score  Error  Units
   AclAuthorizerBenchmark.testAclsIterator 50   
100   20  avgt5  4132.824 ± 2967.122  ms/op
   AclAuthorizerBenchmark.testAuthorizeByResourceType  50   
100   20  avgt546.733 ±5.397  ms/op
   AclAuthorizerBenchmark.testAuthorizer   50   
100   20  avgt5 6.844 ±0.915  ms/op
   AclAuthorizerBenchmark.testUpdateCache  50   
100   20  avgt5  7219.696 ± 4018.189  ms/op
   JMH benchmarks done
   
   
   
   Use Mutable collections:
   
   AclAuthorizerBenchmark.testUpdateCache  50   
100   20  avgt5  4927.832 ± 2570.786  ms/op
   
   
   When aclCount = 50, denyPercentage = 100, resourceCount = 20, the time 
cost is 2.3 seconds more with immutable collections. But since adding 50 * 
2 ACL bindings only takes ~7 seconds, I think the performance should be 
acceptable.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] d8tltanc commented on a change in pull request #9485: KAKFA-10619: Idempotent producer will get authorized once it has a WRITE access to at least one topic

2020-12-15 Thread GitBox


d8tltanc commented on a change in pull request #9485:
URL: https://github.com/apache/kafka/pull/9485#discussion_r543805858



##
File path: core/src/main/scala/kafka/security/authorizer/AclAuthorizer.scala
##
@@ -130,6 +130,10 @@ class AclAuthorizer extends Authorizer with Logging {
 
   @volatile
   private var aclCache = new 
scala.collection.immutable.TreeMap[ResourcePattern, VersionedAcls]()(new 
ResourceOrdering)
+
+  private val resourceCache = new 
scala.collection.mutable.HashMap[ResourceIndex,
+scala.collection.mutable.HashSet[String]]()

Review comment:
   Use Immutable collections:
   
   Benchmark   (aclCount)  
(denyPercentage)  (resourceCount)  Mode  Cnt Score  Error  Units
   AclAuthorizerBenchmark.testAclsIterator 50   
100   20  avgt5  4132.824 ± 2967.122  ms/op
   AclAuthorizerBenchmark.testAuthorizeByResourceType  50   
100   20  avgt546.733 ±5.397  ms/op
   AclAuthorizerBenchmark.testAuthorizer   50   
100   20  avgt5 6.844 ±0.915  ms/op
   AclAuthorizerBenchmark.testUpdateCache  50   
100   20  avgt5  7219.696 ± 4018.189  ms/op
   JMH benchmarks done
   
   
   
   Use Mutable collections:
   
   AclAuthorizerBenchmark.testUpdateCache  50   
100   20  avgt5  4927.832 ± 2570.786  ms/op
   
   
   When aclCount = 50, denyPercentage = 100, resourceCount = 20, the time 
cost is 2.3 seconds more with immutable collections. But since adding 100 * 
2 ACL bindings only takes ~7 seconds, I think the performance should be 
acceptable.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] d8tltanc commented on a change in pull request #9485: KAKFA-10619: Idempotent producer will get authorized once it has a WRITE access to at least one topic

2020-12-15 Thread GitBox


d8tltanc commented on a change in pull request #9485:
URL: https://github.com/apache/kafka/pull/9485#discussion_r543696178



##
File path: core/src/main/scala/kafka/security/authorizer/AclAuthorizer.scala
##
@@ -130,6 +130,10 @@ class AclAuthorizer extends Authorizer with Logging {
 
   @volatile
   private var aclCache = new 
scala.collection.immutable.TreeMap[ResourcePattern, VersionedAcls]()(new 
ResourceOrdering)
+
+  private val resourceCache = new 
scala.collection.mutable.HashMap[ResourceIndex,
+scala.collection.mutable.HashSet[String]]()

Review comment:
   Use Immutable collections:
   
   Benchmark   (aclCount)  (denyPercentage)  
(resourceCount)  Mode  Cnt  Score  Error  Units
   AclAuthorizerBenchmark.testUpdateCache  50   100 
  20  avgt5  10639.073 ± 3470.889  ms/op
   
   
   Use Mutable collections:
   
   AclAuthorizerBenchmark.testUpdateCache  50   
100   20  avgt5  4927.832 ± 2570.786  ms/op
   
   
   The time cost doubled with immutable collections. But since adding 100 * 
2 ACL bindings only takes 10 seconds, adding 2 ACL binding takes only 
100 ms on average, which should be acceptable.

##
File path: core/src/main/scala/kafka/security/authorizer/AclAuthorizer.scala
##
@@ -130,6 +130,10 @@ class AclAuthorizer extends Authorizer with Logging {
 
   @volatile
   private var aclCache = new 
scala.collection.immutable.TreeMap[ResourcePattern, VersionedAcls]()(new 
ResourceOrdering)
+
+  private val resourceCache = new 
scala.collection.mutable.HashMap[ResourceIndex,
+scala.collection.mutable.HashSet[String]]()

Review comment:
   Use Immutable collections:
   
   Benchmark   (aclCount)  
(denyPercentage)  (resourceCount)  Mode  Cnt Score  Error  Units
   AclAuthorizerBenchmark.testAclsIterator 50   
100   20  avgt5  4132.824 ± 2967.122  ms/op
   AclAuthorizerBenchmark.testAuthorizeByResourceType  50   
100   20  avgt546.733 ±5.397  ms/op
   AclAuthorizerBenchmark.testAuthorizer   50   
100   20  avgt5 6.844 ±0.915  ms/op
   AclAuthorizerBenchmark.testUpdateCache  50   
100   20  avgt5  7219.696 ± 4018.189  ms/op
   JMH benchmarks done
   
   
   
   Use Mutable collections:
   
   AclAuthorizerBenchmark.testUpdateCache  50   
100   20  avgt5  4927.832 ± 2570.786  ms/op
   
   
   The time cost is 2.3 seconds more. with immutable collections. But since 
adding 100 * 2 ACL bindings only takes ~7 seconds, I think the performance 
should be acceptable.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] d8tltanc commented on a change in pull request #9485: KAKFA-10619: Idempotent producer will get authorized once it has a WRITE access to at least one topic

2020-12-15 Thread GitBox


d8tltanc commented on a change in pull request #9485:
URL: https://github.com/apache/kafka/pull/9485#discussion_r543745352



##
File path: 
core/src/test/scala/unit/kafka/security/authorizer/AuthorizerInterfaceDefaultTest.scala
##
@@ -0,0 +1,164 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package kafka.security.authorizer
+
+import java.net.InetAddress
+
+import kafka.server.KafkaConfig
+import kafka.utils.TestUtils
+import kafka.zk.ZooKeeperTestHarness
+import kafka.zookeeper.ZooKeeperClient
+import org.apache.kafka.common.acl._
+import org.apache.kafka.common.network.{ClientInformation, ListenerName}
+import org.apache.kafka.common.protocol.ApiKeys
+import org.apache.kafka.common.requests.{RequestContext, RequestHeader}
+import org.apache.kafka.common.resource.{ResourcePattern, ResourceType}
+import org.apache.kafka.common.security.auth.{KafkaPrincipal, SecurityProtocol}
+import org.apache.kafka.common.utils.Time
+import org.apache.kafka.server.authorizer._
+import org.junit.{After, Before, Test}
+
+import scala.collection.mutable.ArrayBuffer
+import scala.jdk.CollectionConverters._
+
+class AuthorizerInterfaceDefaultTest extends ZooKeeperTestHarness {
+
+  private val interfaceDefaultAuthorizer = new DelegateAuthorizer
+  private val superUsers = "User:superuser1; User:superuser2"

Review comment:
   Test added for AuthorizerInterfaceDefaultTest, AclAuthorizerTest, 
AuthorizerWrapperTest.
   
   commit dae1a788b70ebc03eab265b1027a4b43ad8e773b





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] d8tltanc commented on a change in pull request #9485: KAKFA-10619: Idempotent producer will get authorized once it has a WRITE access to at least one topic

2020-12-15 Thread GitBox


d8tltanc commented on a change in pull request #9485:
URL: https://github.com/apache/kafka/pull/9485#discussion_r543744182



##
File path: core/src/main/scala/kafka/security/authorizer/AuthorizerWrapper.scala
##
@@ -175,4 +185,39 @@ class AuthorizerWrapper(private[kafka] val baseAuthorizer: 
kafka.security.auth.A
   override def close(): Unit = {
 baseAuthorizer.close()
   }
+
+  override def authorizeByResourceType(requestContext: 
AuthorizableRequestContext,
+   op: AclOperation,
+   resourceType: ResourceType): 
AuthorizationResult = {
+SecurityUtils.authorizeByResourceTypeCheckArgs(op, resourceType)
+
+if (denyAllResource(requestContext, op, resourceType)) {
+  AuthorizationResult.DENIED
+} else if (shouldAllowEveryoneIfNoAclIsFound) {
+  AuthorizationResult.ALLOWED
+} else {
+  super.authorizeByResourceType(requestContext, op, resourceType)
+}

Review comment:
   Good catch. This is super important. 
   
   commit dae1a788b70ebc03eab265b1027a4b43ad8e773b





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] d8tltanc commented on a change in pull request #9485: KAKFA-10619: Idempotent producer will get authorized once it has a WRITE access to at least one topic

2020-12-15 Thread GitBox


d8tltanc commented on a change in pull request #9485:
URL: https://github.com/apache/kafka/pull/9485#discussion_r543744087



##
File path: core/src/main/scala/kafka/security/authorizer/AclAuthorizer.scala
##
@@ -304,6 +308,131 @@ class AclAuthorizer extends Authorizer with Logging {
 if (zkClient != null) zkClient.close()
   }
 
+  override def authorizeByResourceType(requestContext: 
AuthorizableRequestContext,
+   op: AclOperation,
+   resourceType: ResourceType): 
AuthorizationResult = {
+SecurityUtils.authorizeByResourceTypeCheckArgs(op, resourceType)
+
+val principal = new KafkaPrincipal(
+  requestContext.principal().getPrincipalType,
+  requestContext.principal().getName).toString

Review comment:
   Good catch. This is super important. 
   
   commit dae1a788b70ebc03eab265b1027a4b43ad8e773b





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] d8tltanc commented on a change in pull request #9485: KAKFA-10619: Idempotent producer will get authorized once it has a WRITE access to at least one topic

2020-12-15 Thread GitBox


d8tltanc commented on a change in pull request #9485:
URL: https://github.com/apache/kafka/pull/9485#discussion_r543696178



##
File path: core/src/main/scala/kafka/security/authorizer/AclAuthorizer.scala
##
@@ -130,6 +130,10 @@ class AclAuthorizer extends Authorizer with Logging {
 
   @volatile
   private var aclCache = new 
scala.collection.immutable.TreeMap[ResourcePattern, VersionedAcls]()(new 
ResourceOrdering)
+
+  private val resourceCache = new 
scala.collection.mutable.HashMap[ResourceIndex,
+scala.collection.mutable.HashSet[String]]()

Review comment:
   Use Immutable collections:
   
   Benchmark   (aclCount)  (denyPercentage)  
(resourceCount)  Mode  Cnt  Score  Error  Units
   AclAuthorizerBenchmark.testUpdateCache  50   100 
  20  avgt5  10639.073 ± 3470.889  ms/op
   
   
   Use Mutable collections:
   
   AclAuthorizerBenchmark.testUpdateCache  50   
100   20  avgt5  4927.832 ± 2570.786  ms/op
   
   
   The time cost doubled with immutable collections. But since adding 100 * 
2 ACL bindings only takes 10 seconds, adding 2 ACL binding takes only 
100 ms, which should be acceptable.

##
File path: core/src/main/scala/kafka/security/authorizer/AclAuthorizer.scala
##
@@ -130,6 +130,10 @@ class AclAuthorizer extends Authorizer with Logging {
 
   @volatile
   private var aclCache = new 
scala.collection.immutable.TreeMap[ResourcePattern, VersionedAcls]()(new 
ResourceOrdering)
+
+  private val resourceCache = new 
scala.collection.mutable.HashMap[ResourceIndex,
+scala.collection.mutable.HashSet[String]]()

Review comment:
   Use Immutable collections:
   
   Benchmark   (aclCount)  (denyPercentage)  
(resourceCount)  Mode  Cnt  Score  Error  Units
   AclAuthorizerBenchmark.testUpdateCache  50   100 
  20  avgt5  10639.073 ± 3470.889  ms/op
   
   
   Use Mutable collections:
   
   AclAuthorizerBenchmark.testUpdateCache  50   
100   20  avgt5  4927.832 ± 2570.786  ms/op
   
   
   The time cost doubled with immutable collections. But since adding 100 * 
2 ACL bindings only takes 10 seconds, adding 2 ACL binding takes only 
100 ms on average, which should be acceptable.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] d8tltanc commented on a change in pull request #9485: KAKFA-10619: Idempotent producer will get authorized once it has a WRITE access to at least one topic

2020-12-15 Thread GitBox


d8tltanc commented on a change in pull request #9485:
URL: https://github.com/apache/kafka/pull/9485#discussion_r543174139



##
File path: core/src/main/scala/kafka/security/authorizer/AclAuthorizer.scala
##
@@ -130,6 +130,10 @@ class AclAuthorizer extends Authorizer with Logging {
 
   @volatile
   private var aclCache = new 
scala.collection.immutable.TreeMap[ResourcePattern, VersionedAcls]()(new 
ResourceOrdering)
+
+  private val resourceCache = new 
scala.collection.mutable.HashMap[ResourceIndex,
+scala.collection.mutable.HashSet[String]]()

Review comment:
   I tested a bit, using 1 bg thread adding and removing elements to a 
mutable.HashSet while the main thread constantly iterating the HashSet using 
"foreach". The "foreach" call doesn't throw any exception. But I'm a bit unsure 
what would happen if the iteration hits a bucket where some elements are being 
added to or deleted from. 
   
   Let me test what's the overhead using the immutable map. I'd prefer this 
approach as we're expecting much more READ than WRITE to the hashset.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] d8tltanc commented on a change in pull request #9485: KAKFA-10619: Idempotent producer will get authorized once it has a WRITE access to at least one topic

2020-12-15 Thread GitBox


d8tltanc commented on a change in pull request #9485:
URL: https://github.com/apache/kafka/pull/9485#discussion_r543174139



##
File path: core/src/main/scala/kafka/security/authorizer/AclAuthorizer.scala
##
@@ -130,6 +130,10 @@ class AclAuthorizer extends Authorizer with Logging {
 
   @volatile
   private var aclCache = new 
scala.collection.immutable.TreeMap[ResourcePattern, VersionedAcls]()(new 
ResourceOrdering)
+
+  private val resourceCache = new 
scala.collection.mutable.HashMap[ResourceIndex,
+scala.collection.mutable.HashSet[String]]()

Review comment:
   I tested a bit, using 1 bg thread adding and removing elements to a 
mutable.HashSet while the main thread constantly iterating the HashSet using 
"foreach". The "foreach" call doesn't throw any exception. 





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] d8tltanc commented on a change in pull request #9485: KAKFA-10619: Idempotent producer will get authorized once it has a WRITE access to at least one topic

2020-12-15 Thread GitBox


d8tltanc commented on a change in pull request #9485:
URL: https://github.com/apache/kafka/pull/9485#discussion_r543168812



##
File path: core/src/main/scala/kafka/security/authorizer/AclAuthorizer.scala
##
@@ -130,6 +130,10 @@ class AclAuthorizer extends Authorizer with Logging {
 
   @volatile
   private var aclCache = new 
scala.collection.immutable.TreeMap[ResourcePattern, VersionedAcls]()(new 
ResourceOrdering)
+
+  private val resourceCache = new 
scala.collection.mutable.HashMap[ResourceIndex,
+scala.collection.mutable.HashSet[String]]()

Review comment:
   I tested a bit, using 1 bg thread adding elements in order and removing 
those added in order while the main thread query the HashSet using "foreach". 
The elements in the query result are not continous, which means the concurrent 
R/W will destroy the "foreach" completely. 
   
   Let me test the performance of the immutable map a bit. I prefer this 
approach since it won't lock the read operation, which is much more frequent.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] d8tltanc commented on a change in pull request #9485: KAKFA-10619: Idempotent producer will get authorized once it has a WRITE access to at least one topic

2020-12-15 Thread GitBox


d8tltanc commented on a change in pull request #9485:
URL: https://github.com/apache/kafka/pull/9485#discussion_r543168812



##
File path: core/src/main/scala/kafka/security/authorizer/AclAuthorizer.scala
##
@@ -130,6 +130,10 @@ class AclAuthorizer extends Authorizer with Logging {
 
   @volatile
   private var aclCache = new 
scala.collection.immutable.TreeMap[ResourcePattern, VersionedAcls]()(new 
ResourceOrdering)
+
+  private val resourceCache = new 
scala.collection.mutable.HashMap[ResourceIndex,
+scala.collection.mutable.HashSet[String]]()

Review comment:
   I tested a bit, using 1 bg thread adding elements in order and removing 
those added in order while the main thread query the HashSet using "foreach". 
The elements in the query result are not in order, which means the concurrent 
R/W will destroy the "foreach" completely. 
   
   Let me test the performance of the immutable map a bit. I prefer this 
approach since it won't lock the read operation, which is much more frequent.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] d8tltanc commented on a change in pull request #9485: KAKFA-10619: Idempotent producer will get authorized once it has a WRITE access to at least one topic

2020-12-15 Thread GitBox


d8tltanc commented on a change in pull request #9485:
URL: https://github.com/apache/kafka/pull/9485#discussion_r543134773



##
File path: core/src/main/scala/kafka/security/authorizer/AclAuthorizer.scala
##
@@ -130,6 +130,10 @@ class AclAuthorizer extends Authorizer with Logging {
 
   @volatile
   private var aclCache = new 
scala.collection.immutable.TreeMap[ResourcePattern, VersionedAcls]()(new 
ResourceOrdering)
+
+  private val resourceCache = new 
scala.collection.mutable.HashMap[ResourceIndex,
+scala.collection.mutable.HashSet[String]]()

Review comment:
   Would scala "foreach" throw any exception when READ operation races with 
WRITE in HashMap / HashSet? If not, I think we can tolerate some READ 
inconsistency as ZK is also broadcasting the ACL changes asynchronously to 
brokers.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] d8tltanc commented on a change in pull request #9485: KAKFA-10619: Idempotent producer will get authorized once it has a WRITE access to at least one topic

2020-12-14 Thread GitBox


d8tltanc commented on a change in pull request #9485:
URL: https://github.com/apache/kafka/pull/9485#discussion_r543120808



##
File path: 
core/src/test/scala/unit/kafka/security/authorizer/DelegateAuthorizer.scala
##
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package kafka.security.authorizer
+
+import java.{lang, util}
+import java.util.concurrent.CompletionStage
+
+import org.apache.kafka.common.Endpoint
+import org.apache.kafka.common.acl.{AclBinding, AclBindingFilter}
+import org.apache.kafka.server.authorizer.{AclCreateResult, AclDeleteResult, 
Action, AuthorizableRequestContext, AuthorizationResult, Authorizer, 
AuthorizerServerInfo}
+
+/**
+ * For testing the interface default
+ */
+class DelegateAuthorizer extends Authorizer {

Review comment:
   Make sense. commit e31f157eaac1213445dd284fd2209a29f4fa18fd 





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] d8tltanc commented on a change in pull request #9485: KAKFA-10619: Idempotent producer will get authorized once it has a WRITE access to at least one topic

2020-12-14 Thread GitBox


d8tltanc commented on a change in pull request #9485:
URL: https://github.com/apache/kafka/pull/9485#discussion_r543118616



##
File path: 
core/src/test/scala/unit/kafka/security/authorizer/AclAuthorizerTest.scala
##
@@ -1040,19 +1116,24 @@ class AclAuthorizerTest extends ZooKeeperTestHarness {
   securityProtocol, ClientInformation.EMPTY, false)
   }
 
-  private def authorize(authorizer: AclAuthorizer, requestContext: 
RequestContext, operation: AclOperation, resource: ResourcePattern): Boolean = {
+  private def authorize(authorizer: Authorizer, requestContext: 
RequestContext, operation: AclOperation, resource: ResourcePattern): Boolean = {
 val action = new Action(operation, resource, 1, true, true)
 authorizer.authorize(requestContext, List(action).asJava).asScala.head == 
AuthorizationResult.ALLOWED
   }
 
-  private def addAcls(authorizer: AclAuthorizer, aces: 
Set[AccessControlEntry], resourcePattern: ResourcePattern): Unit = {
+  private def authorizeByResourceType(authorizer: Authorizer, requestContext: 
RequestContext, operation: AclOperation, resourceType: ResourceType) : Boolean 
= {
+authorizer.authorizeByResourceType(requestContext, operation, 
resourceType) == AuthorizationResult.ALLOWED
+  }
+
+  private def addAcls(authorizer: Authorizer, aces: Set[AccessControlEntry], 
resourcePattern: ResourcePattern): Unit = {
 val bindings = aces.map { ace => new AclBinding(resourcePattern, ace) }
 authorizer.createAcls(requestContext, bindings.toList.asJava).asScala
   .map(_.toCompletableFuture.get)
   .foreach { result => result.exception.ifPresent { e => throw e } }
+aclAdded += Tuple3(authorizer, aces, resourcePattern)

Review comment:
   Removed as we are not removing ACLs in teadDown() anymore.
   
   commit 825a8ba





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] d8tltanc commented on a change in pull request #9485: KAKFA-10619: Idempotent producer will get authorized once it has a WRITE access to at least one topic

2020-12-14 Thread GitBox


d8tltanc commented on a change in pull request #9485:
URL: https://github.com/apache/kafka/pull/9485#discussion_r543117875



##
File path: 
core/src/test/scala/unit/kafka/security/authorizer/AclAuthorizerTest.scala
##
@@ -74,6 +75,10 @@ class AclAuthorizerTest extends ZooKeeperTestHarness {
   private var config: KafkaConfig = _
   private var zooKeeperClient: ZooKeeperClient = _
 
+  private val aclAdded: ArrayBuffer[(Authorizer, Set[AccessControlEntry], 
ResourcePattern)] = ArrayBuffer()

Review comment:
   Removed as we are not removing ACLs in teadDown() anymore.
   
   commit 825a8ba77ad1766f998a71a9a15f21e73daad84a

##
File path: 
core/src/test/scala/unit/kafka/security/authorizer/AclAuthorizerTest.scala
##
@@ -1040,19 +1116,24 @@ class AclAuthorizerTest extends ZooKeeperTestHarness {
   securityProtocol, ClientInformation.EMPTY, false)
   }
 
-  private def authorize(authorizer: AclAuthorizer, requestContext: 
RequestContext, operation: AclOperation, resource: ResourcePattern): Boolean = {
+  private def authorize(authorizer: Authorizer, requestContext: 
RequestContext, operation: AclOperation, resource: ResourcePattern): Boolean = {

Review comment:
   commit 825a8ba77ad1766f998a71a9a15f21e73daad84a

##
File path: 
core/src/test/scala/unit/kafka/security/authorizer/AuthorizerInterfaceDefaultTest.scala
##
@@ -0,0 +1,164 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package kafka.security.authorizer
+
+import java.net.InetAddress
+
+import kafka.server.KafkaConfig
+import kafka.utils.TestUtils
+import kafka.zk.ZooKeeperTestHarness
+import kafka.zookeeper.ZooKeeperClient
+import org.apache.kafka.common.acl._
+import org.apache.kafka.common.network.{ClientInformation, ListenerName}
+import org.apache.kafka.common.protocol.ApiKeys
+import org.apache.kafka.common.requests.{RequestContext, RequestHeader}
+import org.apache.kafka.common.resource.{ResourcePattern, ResourceType}
+import org.apache.kafka.common.security.auth.{KafkaPrincipal, SecurityProtocol}
+import org.apache.kafka.common.utils.Time
+import org.apache.kafka.server.authorizer._
+import org.junit.{After, Before, Test}
+
+import scala.collection.mutable.ArrayBuffer
+import scala.jdk.CollectionConverters._
+
+class AuthorizerInterfaceDefaultTest extends ZooKeeperTestHarness {
+
+  private val interfaceDefaultAuthorizer = new DelegateAuthorizer
+  private val superUsers = "User:superuser1; User:superuser2"
+  private val username = "alice"
+  private val principal = new KafkaPrincipal(KafkaPrincipal.USER_TYPE, 
username)
+  private val requestContext = newRequestContext(principal, 
InetAddress.getByName("192.168.0.1"))
+  private var config: KafkaConfig = _
+  private var zooKeeperClient: ZooKeeperClient = _
+  private val aclAdded: ArrayBuffer[(Authorizer, Set[AccessControlEntry], 
ResourcePattern)] = ArrayBuffer()
+  private val authorizerTestFactory = new AuthorizerTestFactory(
+newRequestContext, addAcls, authorizeByResourceType, removeAcls)
+
+  class CustomPrincipal(principalType: String, name: String) extends 
KafkaPrincipal(principalType, name) {
+override def equals(o: scala.Any): Boolean = false
+  }
+
+  @Before
+  override def setUp(): Unit = {
+super.setUp()
+
+val authorizers = Seq(interfaceDefaultAuthorizer.authorizer)
+
+// Increase maxUpdateRetries to avoid transient failures
+authorizers.foreach(a => a.maxUpdateRetries = Int.MaxValue)
+
+val props = TestUtils.createBrokerConfig(0, zkConnect)
+props.put(AclAuthorizer.SuperUsersProp, superUsers)
+
+config = KafkaConfig.fromProps(props)
+authorizers.foreach(a => a.configure(config.originals))
+
+zooKeeperClient = new ZooKeeperClient(zkConnect, zkSessionTimeout, 
zkConnectionTimeout, zkMaxInFlightRequests,
+  Time.SYSTEM, "kafka.test", "AclAuthorizerTest")
+  }
+
+  @After
+  override def tearDown(): Unit = {
+val authorizers = Seq(interfaceDefaultAuthorizer)
+authorizers.foreach(a => {
+  a.acls(AclBindingFilter.ANY).forEach(bd => {
+removeAcls(interfaceDefaultAuthorizer, Set(bd.entry), bd.pattern())

Review comment:
   commit 825a8ba77ad1766f998a71a9a15f21e73daad84a





[GitHub] [kafka] d8tltanc commented on a change in pull request #9485: KAKFA-10619: Idempotent producer will get authorized once it has a WRITE access to at least one topic

2020-12-14 Thread GitBox


d8tltanc commented on a change in pull request #9485:
URL: https://github.com/apache/kafka/pull/9485#discussion_r543116084



##
File path: 
core/src/test/scala/unit/kafka/security/authorizer/AclAuthorizerTest.scala
##
@@ -1040,19 +1116,24 @@ class AclAuthorizerTest extends ZooKeeperTestHarness {
   securityProtocol, ClientInformation.EMPTY, false)
   }
 
-  private def authorize(authorizer: AclAuthorizer, requestContext: 
RequestContext, operation: AclOperation, resource: ResourcePattern): Boolean = {
+  private def authorize(authorizer: Authorizer, requestContext: 
RequestContext, operation: AclOperation, resource: ResourcePattern): Boolean = {

Review comment:
   For this method, I changed the signature back to AclAuthorzier as the 
AuthorizerTestFactory is not depending on it.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] d8tltanc commented on a change in pull request #9485: KAKFA-10619: Idempotent producer will get authorized once it has a WRITE access to at least one topic

2020-12-14 Thread GitBox


d8tltanc commented on a change in pull request #9485:
URL: https://github.com/apache/kafka/pull/9485#discussion_r543114091



##
File path: 
core/src/test/scala/unit/kafka/security/authorizer/AclAuthorizerTest.scala
##
@@ -1040,19 +1116,24 @@ class AclAuthorizerTest extends ZooKeeperTestHarness {
   securityProtocol, ClientInformation.EMPTY, false)
   }
 
-  private def authorize(authorizer: AclAuthorizer, requestContext: 
RequestContext, operation: AclOperation, resource: ResourcePattern): Boolean = {
+  private def authorize(authorizer: Authorizer, requestContext: 
RequestContext, operation: AclOperation, resource: ResourcePattern): Boolean = {
 val action = new Action(operation, resource, 1, true, true)
 authorizer.authorize(requestContext, List(action).asJava).asScala.head == 
AuthorizationResult.ALLOWED
   }
 
-  private def addAcls(authorizer: AclAuthorizer, aces: 
Set[AccessControlEntry], resourcePattern: ResourcePattern): Unit = {
+  private def authorizeByResourceType(authorizer: Authorizer, requestContext: 
RequestContext, operation: AclOperation, resourceType: ResourceType) : Boolean 
= {
+authorizer.authorizeByResourceType(requestContext, operation, 
resourceType) == AuthorizationResult.ALLOWED
+  }
+
+  private def addAcls(authorizer: Authorizer, aces: Set[AccessControlEntry], 
resourcePattern: ResourcePattern): Unit = {

Review comment:
   I replied here. Maybe I shouldn't have resolved it. 
https://github.com/apache/kafka/pull/9485#discussion_r540706117
   
   Since AuthorizerInterfaceDefaultTest, AclAuthorizerTest, and 
AuthorizerWrapperTest are sharing some test utils, we need to make this method 
signature abstract a bit, in order to make it usable by AuthorizerTestFactory.
   





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] d8tltanc commented on a change in pull request #9485: KAKFA-10619: Idempotent producer will get authorized once it has a WRITE access to at least one topic

2020-12-14 Thread GitBox


d8tltanc commented on a change in pull request #9485:
URL: https://github.com/apache/kafka/pull/9485#discussion_r543099934



##
File path: core/src/main/scala/kafka/security/authorizer/AuthorizerWrapper.scala
##
@@ -175,4 +185,39 @@ class AuthorizerWrapper(private[kafka] val baseAuthorizer: 
kafka.security.auth.A
   override def close(): Unit = {
 baseAuthorizer.close()
   }
+
+  override def authorizeByResourceType(requestContext: 
AuthorizableRequestContext,
+   op: AclOperation,
+   resourceType: ResourceType): 
AuthorizationResult = {
+SecurityUtils.authorizeByResourceTypeCheckArgs(op, resourceType)
+
+if (denyAllResource(requestContext, op, resourceType)) {
+  AuthorizationResult.DENIED
+} else if (shouldAllowEveryoneIfNoAclIsFound) {
+  AuthorizationResult.ALLOWED
+} else {
+  super.authorizeByResourceType(requestContext, op, resourceType)
+}
+  }
+
+  private def denyAllResource(requestContext: AuthorizableRequestContext,
+  op: AclOperation,
+  resourceType: ResourceType): Boolean = {
+val resourceTypeFilter = new ResourcePatternFilter(
+  resourceType, Resource.WildCardResource, PatternType.LITERAL)
+val principal = new 
KafkaPrincipal(requestContext.principal.getPrincipalType, 
requestContext.principal.getName)
+val host = requestContext.clientAddress().getHostAddress
+val accessControlEntry = new AccessControlEntryFilter(null, null, op, 
AclPermissionType.DENY)
+val aclFilter = new AclBindingFilter(resourceTypeFilter, 
accessControlEntry)
+
+acls(aclFilter).asScala.exists(b => principalHostMatch(b.entry(), 
principal, host))
+  }
+
+  private def principalHostMatch(ace: AccessControlEntry,
+ principal: KafkaPrincipal,
+ host: String): Boolean = {
+((ace.host() == AclEntry.WildcardHost || ace.host() == host)
+  && (ace.principal() == AclEntry.WildcardPrincipalString || 
ace.principal() == principal.toString))

Review comment:
   Yeah. I was trying to restrict the type in order to remind people to 
construct a KafkaPrinciple first. But toString() is an expensive operation.
   
   commit 16576f85a858648cfc4ff882b554ddc65922021c





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] d8tltanc commented on a change in pull request #9485: KAKFA-10619: Idempotent producer will get authorized once it has a WRITE access to at least one topic

2020-12-14 Thread GitBox


d8tltanc commented on a change in pull request #9485:
URL: https://github.com/apache/kafka/pull/9485#discussion_r543093393



##
File path: core/src/main/scala/kafka/security/authorizer/AuthorizerWrapper.scala
##
@@ -71,15 +74,22 @@ object AuthorizerWrapper {
   }
 
   def convertToResource(resourcePattern: ResourcePattern): Resource = {
-Resource(ResourceType.fromJava(resourcePattern.resourceType), 
resourcePattern.name, resourcePattern.patternType)
+Resource(ResourceTypeLegacy.fromJava(resourcePattern.resourceType), 
resourcePattern.name, resourcePattern.patternType)
   }
 }
 
 @deprecated("Use kafka.security.authorizer.AclAuthorizer", "Since 2.5")
 class AuthorizerWrapper(private[kafka] val baseAuthorizer: 
kafka.security.auth.Authorizer) extends Authorizer {
 
+  var shouldAllowEveryoneIfNoAclIsFound = false
+
   override def configure(configs: util.Map[String, _]): Unit = {
 baseAuthorizer.configure(configs)
+shouldAllowEveryoneIfNoAclIsFound = (configs.asScala.get(
+
AclAuthorizer.AllowEveryoneIfNoAclIsFoundProp).exists(_.toString.toBoolean)
+  && baseAuthorizer.authorize(
+new Session(KafkaPrincipal.ANONYMOUS, 
InetAddress.getByName("1.2.3.4")),
+Read, new Resource(Topic, "hi", PatternType.LITERAL)))

Review comment:
   So we have three approaches here:
   1. use .getClass
   2. use .isInstanceOf
   3. only configure the property with the key 
"AclAuthorizer.AllowEveryoneIfNoAclIsFoundProp" in the AuthorizerWrapper 
instance construction so no other property will get in.
   
   Neither of them is perfect but approach 2 also seems better to me. 
   
   commit 1217394c0c3767ac11df958c02a681c8cbc8382b





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] d8tltanc commented on a change in pull request #9485: KAKFA-10619: Idempotent producer will get authorized once it has a WRITE access to at least one topic

2020-12-14 Thread GitBox


d8tltanc commented on a change in pull request #9485:
URL: https://github.com/apache/kafka/pull/9485#discussion_r543093393



##
File path: core/src/main/scala/kafka/security/authorizer/AuthorizerWrapper.scala
##
@@ -71,15 +74,22 @@ object AuthorizerWrapper {
   }
 
   def convertToResource(resourcePattern: ResourcePattern): Resource = {
-Resource(ResourceType.fromJava(resourcePattern.resourceType), 
resourcePattern.name, resourcePattern.patternType)
+Resource(ResourceTypeLegacy.fromJava(resourcePattern.resourceType), 
resourcePattern.name, resourcePattern.patternType)
   }
 }
 
 @deprecated("Use kafka.security.authorizer.AclAuthorizer", "Since 2.5")
 class AuthorizerWrapper(private[kafka] val baseAuthorizer: 
kafka.security.auth.Authorizer) extends Authorizer {
 
+  var shouldAllowEveryoneIfNoAclIsFound = false
+
   override def configure(configs: util.Map[String, _]): Unit = {
 baseAuthorizer.configure(configs)
+shouldAllowEveryoneIfNoAclIsFound = (configs.asScala.get(
+
AclAuthorizer.AllowEveryoneIfNoAclIsFoundProp).exists(_.toString.toBoolean)
+  && baseAuthorizer.authorize(
+new Session(KafkaPrincipal.ANONYMOUS, 
InetAddress.getByName("1.2.3.4")),
+Read, new Resource(Topic, "hi", PatternType.LITERAL)))

Review comment:
   So we have three approaches here:
   1. use .getClass
   2. use .isInstanceOf
   3. only configure the property with the key 
"AclAuthorizer.AllowEveryoneIfNoAclIsFoundProp" in the AuthorizerWrapper 
instance construction so no other property will get constructed.
   
   Neither of them is perfect but approach 2 also seems better to me. 
   
   commit 1217394c0c3767ac11df958c02a681c8cbc8382b





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] d8tltanc commented on a change in pull request #9485: KAKFA-10619: Idempotent producer will get authorized once it has a WRITE access to at least one topic

2020-12-14 Thread GitBox


d8tltanc commented on a change in pull request #9485:
URL: https://github.com/apache/kafka/pull/9485#discussion_r543093393



##
File path: core/src/main/scala/kafka/security/authorizer/AuthorizerWrapper.scala
##
@@ -71,15 +74,22 @@ object AuthorizerWrapper {
   }
 
   def convertToResource(resourcePattern: ResourcePattern): Resource = {
-Resource(ResourceType.fromJava(resourcePattern.resourceType), 
resourcePattern.name, resourcePattern.patternType)
+Resource(ResourceTypeLegacy.fromJava(resourcePattern.resourceType), 
resourcePattern.name, resourcePattern.patternType)
   }
 }
 
 @deprecated("Use kafka.security.authorizer.AclAuthorizer", "Since 2.5")
 class AuthorizerWrapper(private[kafka] val baseAuthorizer: 
kafka.security.auth.Authorizer) extends Authorizer {
 
+  var shouldAllowEveryoneIfNoAclIsFound = false
+
   override def configure(configs: util.Map[String, _]): Unit = {
 baseAuthorizer.configure(configs)
+shouldAllowEveryoneIfNoAclIsFound = (configs.asScala.get(
+
AclAuthorizer.AllowEveryoneIfNoAclIsFoundProp).exists(_.toString.toBoolean)
+  && baseAuthorizer.authorize(
+new Session(KafkaPrincipal.ANONYMOUS, 
InetAddress.getByName("1.2.3.4")),
+Read, new Resource(Topic, "hi", PatternType.LITERAL)))

Review comment:
   So we have three approaches here:
   1. use .getClass
   2. use .isInstanceOf
   3. only configure the property with the key 
"AclAuthorizer.AllowEveryoneIfNoAclIsFoundProp" in the AuthorizerWrapper class 
instance object construction so no other property will get constructed.
   
   Neither of them is perfect but approach 2 also seems better to me. 
   
   commit 1217394c0c3767ac11df958c02a681c8cbc8382b





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] d8tltanc commented on a change in pull request #9485: KAKFA-10619: Idempotent producer will get authorized once it has a WRITE access to at least one topic

2020-12-14 Thread GitBox


d8tltanc commented on a change in pull request #9485:
URL: https://github.com/apache/kafka/pull/9485#discussion_r543093393



##
File path: core/src/main/scala/kafka/security/authorizer/AuthorizerWrapper.scala
##
@@ -71,15 +74,22 @@ object AuthorizerWrapper {
   }
 
   def convertToResource(resourcePattern: ResourcePattern): Resource = {
-Resource(ResourceType.fromJava(resourcePattern.resourceType), 
resourcePattern.name, resourcePattern.patternType)
+Resource(ResourceTypeLegacy.fromJava(resourcePattern.resourceType), 
resourcePattern.name, resourcePattern.patternType)
   }
 }
 
 @deprecated("Use kafka.security.authorizer.AclAuthorizer", "Since 2.5")
 class AuthorizerWrapper(private[kafka] val baseAuthorizer: 
kafka.security.auth.Authorizer) extends Authorizer {
 
+  var shouldAllowEveryoneIfNoAclIsFound = false
+
   override def configure(configs: util.Map[String, _]): Unit = {
 baseAuthorizer.configure(configs)
+shouldAllowEveryoneIfNoAclIsFound = (configs.asScala.get(
+
AclAuthorizer.AllowEveryoneIfNoAclIsFoundProp).exists(_.toString.toBoolean)
+  && baseAuthorizer.authorize(
+new Session(KafkaPrincipal.ANONYMOUS, 
InetAddress.getByName("1.2.3.4")),
+Read, new Resource(Topic, "hi", PatternType.LITERAL)))

Review comment:
   So we have three approaches here:
   1. use .getClass
   2. use .isInstanceOf
   3. only configure the property with the key 
"AclAuthorizer.AllowEveryoneIfNoAclIsFoundProp" in the class instance object 
construction so no other property will get constructed.
   
   Neither of them is perfect but approach 2 also seems better to me. 
   
   commit 1217394c0c3767ac11df958c02a681c8cbc8382b





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] d8tltanc commented on a change in pull request #9485: KAKFA-10619: Idempotent producer will get authorized once it has a WRITE access to at least one topic

2020-12-14 Thread GitBox


d8tltanc commented on a change in pull request #9485:
URL: https://github.com/apache/kafka/pull/9485#discussion_r543093393



##
File path: core/src/main/scala/kafka/security/authorizer/AuthorizerWrapper.scala
##
@@ -71,15 +74,22 @@ object AuthorizerWrapper {
   }
 
   def convertToResource(resourcePattern: ResourcePattern): Resource = {
-Resource(ResourceType.fromJava(resourcePattern.resourceType), 
resourcePattern.name, resourcePattern.patternType)
+Resource(ResourceTypeLegacy.fromJava(resourcePattern.resourceType), 
resourcePattern.name, resourcePattern.patternType)
   }
 }
 
 @deprecated("Use kafka.security.authorizer.AclAuthorizer", "Since 2.5")
 class AuthorizerWrapper(private[kafka] val baseAuthorizer: 
kafka.security.auth.Authorizer) extends Authorizer {
 
+  var shouldAllowEveryoneIfNoAclIsFound = false
+
   override def configure(configs: util.Map[String, _]): Unit = {
 baseAuthorizer.configure(configs)
+shouldAllowEveryoneIfNoAclIsFound = (configs.asScala.get(
+
AclAuthorizer.AllowEveryoneIfNoAclIsFoundProp).exists(_.toString.toBoolean)
+  && baseAuthorizer.authorize(
+new Session(KafkaPrincipal.ANONYMOUS, 
InetAddress.getByName("1.2.3.4")),
+Read, new Resource(Topic, "hi", PatternType.LITERAL)))

Review comment:
   So we have three approaches here:
   1. use .getClass
   2. use .isInstanceOf
   3. only configure the property with the key 
"AclAuthorizer.AllowEveryoneIfNoAclIsFoundProp" in the class object 
construction so no other property will get constructed.
   
   Neither of them is perfect but approach 2 also seems better to me. 
   
   commit 1217394c0c3767ac11df958c02a681c8cbc8382b





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] d8tltanc commented on a change in pull request #9485: KAKFA-10619: Idempotent producer will get authorized once it has a WRITE access to at least one topic

2020-12-14 Thread GitBox


d8tltanc commented on a change in pull request #9485:
URL: https://github.com/apache/kafka/pull/9485#discussion_r543093393



##
File path: core/src/main/scala/kafka/security/authorizer/AuthorizerWrapper.scala
##
@@ -71,15 +74,22 @@ object AuthorizerWrapper {
   }
 
   def convertToResource(resourcePattern: ResourcePattern): Resource = {
-Resource(ResourceType.fromJava(resourcePattern.resourceType), 
resourcePattern.name, resourcePattern.patternType)
+Resource(ResourceTypeLegacy.fromJava(resourcePattern.resourceType), 
resourcePattern.name, resourcePattern.patternType)
   }
 }
 
 @deprecated("Use kafka.security.authorizer.AclAuthorizer", "Since 2.5")
 class AuthorizerWrapper(private[kafka] val baseAuthorizer: 
kafka.security.auth.Authorizer) extends Authorizer {
 
+  var shouldAllowEveryoneIfNoAclIsFound = false
+
   override def configure(configs: util.Map[String, _]): Unit = {
 baseAuthorizer.configure(configs)
+shouldAllowEveryoneIfNoAclIsFound = (configs.asScala.get(
+
AclAuthorizer.AllowEveryoneIfNoAclIsFoundProp).exists(_.toString.toBoolean)
+  && baseAuthorizer.authorize(
+new Session(KafkaPrincipal.ANONYMOUS, 
InetAddress.getByName("1.2.3.4")),
+Read, new Resource(Topic, "hi", PatternType.LITERAL)))

Review comment:
   So we have two approaches here:
   1. use .getClass
   2. use .isInstanceOf
   3. only configure the property with the key 
"AclAuthorizer.AllowEveryoneIfNoAclIsFoundProp" in the class object 
construction so no other property will get constructed.
   
   Neither of them is perfect but approach 2 also seems better to me. 
   
   commit 1217394c0c3767ac11df958c02a681c8cbc8382b





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] d8tltanc commented on a change in pull request #9485: KAKFA-10619: Idempotent producer will get authorized once it has a WRITE access to at least one topic

2020-12-14 Thread GitBox


d8tltanc commented on a change in pull request #9485:
URL: https://github.com/apache/kafka/pull/9485#discussion_r543093393



##
File path: core/src/main/scala/kafka/security/authorizer/AuthorizerWrapper.scala
##
@@ -71,15 +74,22 @@ object AuthorizerWrapper {
   }
 
   def convertToResource(resourcePattern: ResourcePattern): Resource = {
-Resource(ResourceType.fromJava(resourcePattern.resourceType), 
resourcePattern.name, resourcePattern.patternType)
+Resource(ResourceTypeLegacy.fromJava(resourcePattern.resourceType), 
resourcePattern.name, resourcePattern.patternType)
   }
 }
 
 @deprecated("Use kafka.security.authorizer.AclAuthorizer", "Since 2.5")
 class AuthorizerWrapper(private[kafka] val baseAuthorizer: 
kafka.security.auth.Authorizer) extends Authorizer {
 
+  var shouldAllowEveryoneIfNoAclIsFound = false
+
   override def configure(configs: util.Map[String, _]): Unit = {
 baseAuthorizer.configure(configs)
+shouldAllowEveryoneIfNoAclIsFound = (configs.asScala.get(
+
AclAuthorizer.AllowEveryoneIfNoAclIsFoundProp).exists(_.toString.toBoolean)
+  && baseAuthorizer.authorize(
+new Session(KafkaPrincipal.ANONYMOUS, 
InetAddress.getByName("1.2.3.4")),
+Read, new Resource(Topic, "hi", PatternType.LITERAL)))

Review comment:
   So we have two approaches here:
   1. use .getClass
   2. use .isInstanceOf
   
   Neither of them is perfect but approach 2 also seems better to me. 
   
   commit 1217394c0c3767ac11df958c02a681c8cbc8382b





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] d8tltanc commented on a change in pull request #9485: KAKFA-10619: Idempotent producer will get authorized once it has a WRITE access to at least one topic

2020-12-14 Thread GitBox


d8tltanc commented on a change in pull request #9485:
URL: https://github.com/apache/kafka/pull/9485#discussion_r543087570



##
File path: core/src/main/scala/kafka/security/authorizer/AuthorizerWrapper.scala
##
@@ -71,15 +74,22 @@ object AuthorizerWrapper {
   }
 
   def convertToResource(resourcePattern: ResourcePattern): Resource = {
-Resource(ResourceType.fromJava(resourcePattern.resourceType), 
resourcePattern.name, resourcePattern.patternType)
+Resource(ResourceTypeLegacy.fromJava(resourcePattern.resourceType), 
resourcePattern.name, resourcePattern.patternType)
   }
 }
 
 @deprecated("Use kafka.security.authorizer.AclAuthorizer", "Since 2.5")
 class AuthorizerWrapper(private[kafka] val baseAuthorizer: 
kafka.security.auth.Authorizer) extends Authorizer {
 
+  var shouldAllowEveryoneIfNoAclIsFound = false
+
   override def configure(configs: util.Map[String, _]): Unit = {
 baseAuthorizer.configure(configs)
+shouldAllowEveryoneIfNoAclIsFound = (configs.asScala.get(
+
AclAuthorizer.AllowEveryoneIfNoAclIsFoundProp).exists(_.toString.toBoolean)
+  && baseAuthorizer.authorize(
+new Session(KafkaPrincipal.ANONYMOUS, 
InetAddress.getByName("1.2.3.4")),
+Read, new Resource(Topic, "hi", PatternType.LITERAL)))

Review comment:
   So we have two approaches here:
   
   1. use baseAuthorizer.getClass == classOf[SimpleAclAuthorizer]
   2. use baseAuthorizer.isInstanceOf[SimpleAclAuthorizer]
   
   They are neither perfect. Approach 2 also seems better to me.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] d8tltanc commented on a change in pull request #9485: KAKFA-10619: Idempotent producer will get authorized once it has a WRITE access to at least one topic

2020-12-14 Thread GitBox


d8tltanc commented on a change in pull request #9485:
URL: https://github.com/apache/kafka/pull/9485#discussion_r543087570



##
File path: core/src/main/scala/kafka/security/authorizer/AuthorizerWrapper.scala
##
@@ -71,15 +74,22 @@ object AuthorizerWrapper {
   }
 
   def convertToResource(resourcePattern: ResourcePattern): Resource = {
-Resource(ResourceType.fromJava(resourcePattern.resourceType), 
resourcePattern.name, resourcePattern.patternType)
+Resource(ResourceTypeLegacy.fromJava(resourcePattern.resourceType), 
resourcePattern.name, resourcePattern.patternType)
   }
 }
 
 @deprecated("Use kafka.security.authorizer.AclAuthorizer", "Since 2.5")
 class AuthorizerWrapper(private[kafka] val baseAuthorizer: 
kafka.security.auth.Authorizer) extends Authorizer {
 
+  var shouldAllowEveryoneIfNoAclIsFound = false
+
   override def configure(configs: util.Map[String, _]): Unit = {
 baseAuthorizer.configure(configs)
+shouldAllowEveryoneIfNoAclIsFound = (configs.asScala.get(
+
AclAuthorizer.AllowEveryoneIfNoAclIsFoundProp).exists(_.toString.toBoolean)
+  && baseAuthorizer.authorize(
+new Session(KafkaPrincipal.ANONYMOUS, 
InetAddress.getByName("1.2.3.4")),
+Read, new Resource(Topic, "hi", PatternType.LITERAL)))

Review comment:
   So we have two approaches here:
   
   1. use baseAuthorizer.getClass == classOf[SimpleAclAuthorizer]
   2. use baseAuthorizer.isInstanceOf[SimpleAclAuthorizer]
   
   They are neither perfect. Approach 2 also seems better to me.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] d8tltanc commented on a change in pull request #9485: KAKFA-10619: Idempotent producer will get authorized once it has a WRITE access to at least one topic

2020-12-14 Thread GitBox


d8tltanc commented on a change in pull request #9485:
URL: https://github.com/apache/kafka/pull/9485#discussion_r543087570



##
File path: core/src/main/scala/kafka/security/authorizer/AuthorizerWrapper.scala
##
@@ -71,15 +74,22 @@ object AuthorizerWrapper {
   }
 
   def convertToResource(resourcePattern: ResourcePattern): Resource = {
-Resource(ResourceType.fromJava(resourcePattern.resourceType), 
resourcePattern.name, resourcePattern.patternType)
+Resource(ResourceTypeLegacy.fromJava(resourcePattern.resourceType), 
resourcePattern.name, resourcePattern.patternType)
   }
 }
 
 @deprecated("Use kafka.security.authorizer.AclAuthorizer", "Since 2.5")
 class AuthorizerWrapper(private[kafka] val baseAuthorizer: 
kafka.security.auth.Authorizer) extends Authorizer {
 
+  var shouldAllowEveryoneIfNoAclIsFound = false
+
   override def configure(configs: util.Map[String, _]): Unit = {
 baseAuthorizer.configure(configs)
+shouldAllowEveryoneIfNoAclIsFound = (configs.asScala.get(
+
AclAuthorizer.AllowEveryoneIfNoAclIsFoundProp).exists(_.toString.toBoolean)
+  && baseAuthorizer.authorize(
+new Session(KafkaPrincipal.ANONYMOUS, 
InetAddress.getByName("1.2.3.4")),
+Read, new Resource(Topic, "hi", PatternType.LITERAL)))

Review comment:
   I think "isInstanceOf" will still return true if the `baseAuthorizer` is 
a subclass extending SimpleAclAuthorizer.
   
   class Father {
   
   }
   
   
   class Son extends Father {
   
   }
   
 val father = new Father
 val son = new Son
 println(son.isInstanceOf[Father])
 println(son.getClass == classOf[Father])
 println(son.getClass == classOf[Son])
   
   > true
   >false
   >true





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] d8tltanc commented on a change in pull request #9485: KAKFA-10619: Idempotent producer will get authorized once it has a WRITE access to at least one topic

2020-12-14 Thread GitBox


d8tltanc commented on a change in pull request #9485:
URL: https://github.com/apache/kafka/pull/9485#discussion_r543087570



##
File path: core/src/main/scala/kafka/security/authorizer/AuthorizerWrapper.scala
##
@@ -71,15 +74,22 @@ object AuthorizerWrapper {
   }
 
   def convertToResource(resourcePattern: ResourcePattern): Resource = {
-Resource(ResourceType.fromJava(resourcePattern.resourceType), 
resourcePattern.name, resourcePattern.patternType)
+Resource(ResourceTypeLegacy.fromJava(resourcePattern.resourceType), 
resourcePattern.name, resourcePattern.patternType)
   }
 }
 
 @deprecated("Use kafka.security.authorizer.AclAuthorizer", "Since 2.5")
 class AuthorizerWrapper(private[kafka] val baseAuthorizer: 
kafka.security.auth.Authorizer) extends Authorizer {
 
+  var shouldAllowEveryoneIfNoAclIsFound = false
+
   override def configure(configs: util.Map[String, _]): Unit = {
 baseAuthorizer.configure(configs)
+shouldAllowEveryoneIfNoAclIsFound = (configs.asScala.get(
+
AclAuthorizer.AllowEveryoneIfNoAclIsFoundProp).exists(_.toString.toBoolean)
+  && baseAuthorizer.authorize(
+new Session(KafkaPrincipal.ANONYMOUS, 
InetAddress.getByName("1.2.3.4")),
+Read, new Resource(Topic, "hi", PatternType.LITERAL)))

Review comment:
   I think "isInstanceOf" will still return true if the `baseAuthorizer` is 
a subclass extending SimpleAclAuthorizer.
   
   class Father {
   
   }
   
   
   class Son extends Father {
   
   }
   
 val father = new Father
 val son = new Son
 println(son.isInstanceOf[Father])
 println(son.getClass == classOf[Father])
 println(son.getClass == classOf[Son])
   
   >>> true
   >>>false
   >>>true





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] d8tltanc commented on a change in pull request #9485: KAKFA-10619: Idempotent producer will get authorized once it has a WRITE access to at least one topic

2020-12-14 Thread GitBox


d8tltanc commented on a change in pull request #9485:
URL: https://github.com/apache/kafka/pull/9485#discussion_r543080612



##
File path: 
clients/src/main/java/org/apache/kafka/server/authorizer/Authorizer.java
##
@@ -139,4 +151,126 @@
  * @return Iterator for ACL bindings, which may be populated lazily.
  */
 Iterable acls(AclBindingFilter filter);
+
+/**
+ * Check if the caller is authorized to perform the given ACL operation on 
at least one
+ * resource of the given type.
+ *
+ * 1. Filter out all the resource pattern corresponding to the 
requestContext, AclOperation,
+ *and ResourceType
+ * 2. If wildcard deny exists, return deny directly
+ * 3. For any literal allowed resource, if there's no dominant literal 
denied resource, and
+ *no dominant prefixed denied resource, return allow
+ * 4. For any prefixed allowed resource, if there's no dominant denied 
resource, return allow
+ * 5. For any other cases, return deny

Review comment:
   Sure. commit 25e0bfcc97f956ceb4254ab8c457fe5d8d250e82





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] d8tltanc commented on a change in pull request #9485: KAKFA-10619: Idempotent producer will get authorized once it has a WRITE access to at least one topic

2020-12-14 Thread GitBox


d8tltanc commented on a change in pull request #9485:
URL: https://github.com/apache/kafka/pull/9485#discussion_r540418386



##
File path: core/src/main/scala/kafka/security/authorizer/AuthorizerWrapper.scala
##
@@ -71,15 +73,19 @@ object AuthorizerWrapper {
   }
 
   def convertToResource(resourcePattern: ResourcePattern): Resource = {
-Resource(ResourceType.fromJava(resourcePattern.resourceType), 
resourcePattern.name, resourcePattern.patternType)
+Resource(ResourceTypeLegacy.fromJava(resourcePattern.resourceType), 
resourcePattern.name, resourcePattern.patternType)
   }
 }
 
 @deprecated("Use kafka.security.authorizer.AclAuthorizer", "Since 2.5")
 class AuthorizerWrapper(private[kafka] val baseAuthorizer: 
kafka.security.auth.Authorizer) extends Authorizer {
 
+  var shouldAllowEveryoneIfNoAclIsFound = false
+
   override def configure(configs: util.Map[String, _]): Unit = {
 baseAuthorizer.configure(configs)
+shouldAllowEveryoneIfNoAclIsFound = configs.asScala.get(
+  
AclAuthorizer.AllowEveryoneIfNoAclIsFoundProp).exists(_.toString.toBoolean)

Review comment:
   Given that we probably don't want to change the deprecated Authorizer 
interface, I can only think of one way to achieve this:
   
   Besides checking if the `AllowEveryoneIfNoAclIsFoundProp` exists and if it 
equals to `true`, I added another check to authorize on a hardcoded session, 
operation, and resource.
   
   Since configure() will be called immediately after the authorizer 
instantiation, it's guaranteed that no ACLs would exist when we do this check. 
   
   override def configure(configs: util.Map[String, _]): Unit = {
   ..baseAuthorizer.configure(configs)
   shouldAllowEveryoneIfNoAclIsFound = (configs.asScala.get(
   
..AclAuthorizer.AllowEveryoneIfNoAclIsFoundProp).exists(_.toString.toBoolean)
   && baseAuthorizer.authorize(
   ..new Session(KafkaPrincipal.ANONYMOUS, 
InetAddress.getByName("1.2.3.4")),
   Read, new Resource(Topic, "hi", PatternType.LITERAL)))
   }
   
   commit 2ed79a0a7788f8841475badfd1c26adf0eb3435c





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] d8tltanc commented on a change in pull request #9485: KAKFA-10619: Idempotent producer will get authorized once it has a WRITE access to at least one topic

2020-12-13 Thread GitBox


d8tltanc commented on a change in pull request #9485:
URL: https://github.com/apache/kafka/pull/9485#discussion_r542039322



##
File path: 
jmh-benchmarks/src/main/java/org/apache/kafka/jmh/acl/AclAuthorizerBenchmark.java
##
@@ -105,49 +120,76 @@ private void setFieldValue(Object obj, String fieldName, 
Object value) throws Ex
 field.set(obj, value);
 }
 
-private TreeMap prepareAclCache() {
+private void prepareAclCache() throws UnknownHostException {
 Map> aclEntries = new HashMap<>();
 for (int resourceId = 0; resourceId < resourceCount; resourceId++) {
 ResourcePattern resource = new ResourcePattern(
 (resourceId % 10 == 0) ? ResourceType.GROUP : 
ResourceType.TOPIC,
-resourceNamePrefix + resourceId,
+resourceName(resourceNamePrefix),

Review comment:
   Benchmark result: https://paste.ubuntu.com/p/zvjZC4QkMM/
   Performance pattern doesn't change, except `testUpdateCache` runs much 
faster now.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] d8tltanc commented on a change in pull request #9485: KAKFA-10619: Idempotent producer will get authorized once it has a WRITE access to at least one topic

2020-12-13 Thread GitBox


d8tltanc commented on a change in pull request #9485:
URL: https://github.com/apache/kafka/pull/9485#discussion_r542039322



##
File path: 
jmh-benchmarks/src/main/java/org/apache/kafka/jmh/acl/AclAuthorizerBenchmark.java
##
@@ -105,49 +120,76 @@ private void setFieldValue(Object obj, String fieldName, 
Object value) throws Ex
 field.set(obj, value);
 }
 
-private TreeMap prepareAclCache() {
+private void prepareAclCache() throws UnknownHostException {
 Map> aclEntries = new HashMap<>();
 for (int resourceId = 0; resourceId < resourceCount; resourceId++) {
 ResourcePattern resource = new ResourcePattern(
 (resourceId % 10 == 0) ? ResourceType.GROUP : 
ResourceType.TOPIC,
-resourceNamePrefix + resourceId,
+resourceName(resourceNamePrefix),

Review comment:
   Benchmark result: https://paste.ubuntu.com/p/zvjZC4QkMM/





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] d8tltanc commented on a change in pull request #9485: KAKFA-10619: Idempotent producer will get authorized once it has a WRITE access to at least one topic

2020-12-13 Thread GitBox


d8tltanc commented on a change in pull request #9485:
URL: https://github.com/apache/kafka/pull/9485#discussion_r539782093



##
File path: core/src/main/scala/kafka/security/authorizer/AclAuthorizer.scala
##
@@ -304,6 +308,105 @@ class AclAuthorizer extends Authorizer with Logging {
 if (zkClient != null) zkClient.close()
   }
 
+  // TODO: 1. Discuss how to log audit message
+  // TODO: 2. Discuss if we need a trie to optimize(mainly for the O(n^2) loop 
but I think
+  //  in most of the cases it would be O(1) because denyDominatePrefixAllow 
should be rare
+  override def authorizeByResourceType(requestContext: 
AuthorizableRequestContext,
+   op: AclOperation,
+   resourceType: ResourceType): 
AuthorizationResult = {
+SecurityUtils.authorizeByResourceTypeCheckArgs(op, resourceType)
+
+val principal = new KafkaPrincipal(
+  requestContext.principal().getPrincipalType,
+  requestContext.principal().getName).toString
+val host = requestContext.clientAddress().getHostAddress
+val action = new Action(op, new ResourcePattern(resourceType, "NONE", 
PatternType.UNKNOWN), 0, true, true)
+
+val denyLiterals = matchingResources(
+  principal, host, op, AclPermissionType.DENY, resourceType, 
PatternType.LITERAL)
+
+if (denyAll(denyLiterals)) {
+  logAuditMessage(requestContext, action, false)
+  return AuthorizationResult.DENIED
+}
+
+if (shouldAllowEveryoneIfNoAclIsFound) {
+  logAuditMessage(requestContext, action, true)
+  return AuthorizationResult.ALLOWED
+}
+
+val allowLiterals = matchingResources(
+  principal, host, op, AclPermissionType.ALLOW, resourceType, 
PatternType.LITERAL)
+val allowPrefixes = matchingResources(
+  principal, host, op, AclPermissionType.ALLOW, resourceType, 
PatternType.PREFIXED)
+val denyPrefixes = matchingResources(
+  principal, host, op, AclPermissionType.DENY, resourceType, 
PatternType.PREFIXED)
+
+if (allowAny(allowLiterals, allowPrefixes, denyLiterals, denyPrefixes)) {
+  logAuditMessage(requestContext, action, true)
+  return AuthorizationResult.ALLOWED
+}
+
+logAuditMessage(requestContext, action, false)
+AuthorizationResult.DENIED
+  }
+
+  def matchingResources(principal: String, host: String, op: AclOperation, 
permission: AclPermissionType,
+resourceType: ResourceType, patternType: PatternType): 
List[mutable.HashSet[String]] = {

Review comment:
   Because we don't wanna reconstruct a new large set containing all the 
matching resources. We are constructing a List of  ~ 3 * 3 * 3 elements which 
refer to existing HashSets maintained by `updateCache`.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] d8tltanc commented on a change in pull request #9485: KAKFA-10619: Idempotent producer will get authorized once it has a WRITE access to at least one topic

2020-12-13 Thread GitBox


d8tltanc commented on a change in pull request #9485:
URL: https://github.com/apache/kafka/pull/9485#discussion_r539782093



##
File path: core/src/main/scala/kafka/security/authorizer/AclAuthorizer.scala
##
@@ -304,6 +308,105 @@ class AclAuthorizer extends Authorizer with Logging {
 if (zkClient != null) zkClient.close()
   }
 
+  // TODO: 1. Discuss how to log audit message
+  // TODO: 2. Discuss if we need a trie to optimize(mainly for the O(n^2) loop 
but I think
+  //  in most of the cases it would be O(1) because denyDominatePrefixAllow 
should be rare
+  override def authorizeByResourceType(requestContext: 
AuthorizableRequestContext,
+   op: AclOperation,
+   resourceType: ResourceType): 
AuthorizationResult = {
+SecurityUtils.authorizeByResourceTypeCheckArgs(op, resourceType)
+
+val principal = new KafkaPrincipal(
+  requestContext.principal().getPrincipalType,
+  requestContext.principal().getName).toString
+val host = requestContext.clientAddress().getHostAddress
+val action = new Action(op, new ResourcePattern(resourceType, "NONE", 
PatternType.UNKNOWN), 0, true, true)
+
+val denyLiterals = matchingResources(
+  principal, host, op, AclPermissionType.DENY, resourceType, 
PatternType.LITERAL)
+
+if (denyAll(denyLiterals)) {
+  logAuditMessage(requestContext, action, false)
+  return AuthorizationResult.DENIED
+}
+
+if (shouldAllowEveryoneIfNoAclIsFound) {
+  logAuditMessage(requestContext, action, true)
+  return AuthorizationResult.ALLOWED
+}
+
+val allowLiterals = matchingResources(
+  principal, host, op, AclPermissionType.ALLOW, resourceType, 
PatternType.LITERAL)
+val allowPrefixes = matchingResources(
+  principal, host, op, AclPermissionType.ALLOW, resourceType, 
PatternType.PREFIXED)
+val denyPrefixes = matchingResources(
+  principal, host, op, AclPermissionType.DENY, resourceType, 
PatternType.PREFIXED)
+
+if (allowAny(allowLiterals, allowPrefixes, denyLiterals, denyPrefixes)) {
+  logAuditMessage(requestContext, action, true)
+  return AuthorizationResult.ALLOWED
+}
+
+logAuditMessage(requestContext, action, false)
+AuthorizationResult.DENIED
+  }
+
+  def matchingResources(principal: String, host: String, op: AclOperation, 
permission: AclPermissionType,
+resourceType: ResourceType, patternType: PatternType): 
List[mutable.HashSet[String]] = {

Review comment:
   Because we don't wanna reconstruct a new large set containing all the 
matching resources. We are constructing a List of  ~ 3 * 3 * 3 elements which 
refer to existing HashSets maintaining by `updateCache`.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] d8tltanc commented on a change in pull request #9485: KAKFA-10619: Idempotent producer will get authorized once it has a WRITE access to at least one topic

2020-12-13 Thread GitBox


d8tltanc commented on a change in pull request #9485:
URL: https://github.com/apache/kafka/pull/9485#discussion_r539782093



##
File path: core/src/main/scala/kafka/security/authorizer/AclAuthorizer.scala
##
@@ -304,6 +308,105 @@ class AclAuthorizer extends Authorizer with Logging {
 if (zkClient != null) zkClient.close()
   }
 
+  // TODO: 1. Discuss how to log audit message
+  // TODO: 2. Discuss if we need a trie to optimize(mainly for the O(n^2) loop 
but I think
+  //  in most of the cases it would be O(1) because denyDominatePrefixAllow 
should be rare
+  override def authorizeByResourceType(requestContext: 
AuthorizableRequestContext,
+   op: AclOperation,
+   resourceType: ResourceType): 
AuthorizationResult = {
+SecurityUtils.authorizeByResourceTypeCheckArgs(op, resourceType)
+
+val principal = new KafkaPrincipal(
+  requestContext.principal().getPrincipalType,
+  requestContext.principal().getName).toString
+val host = requestContext.clientAddress().getHostAddress
+val action = new Action(op, new ResourcePattern(resourceType, "NONE", 
PatternType.UNKNOWN), 0, true, true)
+
+val denyLiterals = matchingResources(
+  principal, host, op, AclPermissionType.DENY, resourceType, 
PatternType.LITERAL)
+
+if (denyAll(denyLiterals)) {
+  logAuditMessage(requestContext, action, false)
+  return AuthorizationResult.DENIED
+}
+
+if (shouldAllowEveryoneIfNoAclIsFound) {
+  logAuditMessage(requestContext, action, true)
+  return AuthorizationResult.ALLOWED
+}
+
+val allowLiterals = matchingResources(
+  principal, host, op, AclPermissionType.ALLOW, resourceType, 
PatternType.LITERAL)
+val allowPrefixes = matchingResources(
+  principal, host, op, AclPermissionType.ALLOW, resourceType, 
PatternType.PREFIXED)
+val denyPrefixes = matchingResources(
+  principal, host, op, AclPermissionType.DENY, resourceType, 
PatternType.PREFIXED)
+
+if (allowAny(allowLiterals, allowPrefixes, denyLiterals, denyPrefixes)) {
+  logAuditMessage(requestContext, action, true)
+  return AuthorizationResult.ALLOWED
+}
+
+logAuditMessage(requestContext, action, false)
+AuthorizationResult.DENIED
+  }
+
+  def matchingResources(principal: String, host: String, op: AclOperation, 
permission: AclPermissionType,
+resourceType: ResourceType, patternType: PatternType): 
List[mutable.HashSet[String]] = {

Review comment:
   Because we don't wanna reconstruct a new large set containing all the 
matching resources. We are constructing a List of  ~ 3 * 3 * 3 elements, which 
refers to existing HashSets maintaining by `updateCache`.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] d8tltanc commented on a change in pull request #9485: KAKFA-10619: Idempotent producer will get authorized once it has a WRITE access to at least one topic

2020-12-13 Thread GitBox


d8tltanc commented on a change in pull request #9485:
URL: https://github.com/apache/kafka/pull/9485#discussion_r539782093



##
File path: core/src/main/scala/kafka/security/authorizer/AclAuthorizer.scala
##
@@ -304,6 +308,105 @@ class AclAuthorizer extends Authorizer with Logging {
 if (zkClient != null) zkClient.close()
   }
 
+  // TODO: 1. Discuss how to log audit message
+  // TODO: 2. Discuss if we need a trie to optimize(mainly for the O(n^2) loop 
but I think
+  //  in most of the cases it would be O(1) because denyDominatePrefixAllow 
should be rare
+  override def authorizeByResourceType(requestContext: 
AuthorizableRequestContext,
+   op: AclOperation,
+   resourceType: ResourceType): 
AuthorizationResult = {
+SecurityUtils.authorizeByResourceTypeCheckArgs(op, resourceType)
+
+val principal = new KafkaPrincipal(
+  requestContext.principal().getPrincipalType,
+  requestContext.principal().getName).toString
+val host = requestContext.clientAddress().getHostAddress
+val action = new Action(op, new ResourcePattern(resourceType, "NONE", 
PatternType.UNKNOWN), 0, true, true)
+
+val denyLiterals = matchingResources(
+  principal, host, op, AclPermissionType.DENY, resourceType, 
PatternType.LITERAL)
+
+if (denyAll(denyLiterals)) {
+  logAuditMessage(requestContext, action, false)
+  return AuthorizationResult.DENIED
+}
+
+if (shouldAllowEveryoneIfNoAclIsFound) {
+  logAuditMessage(requestContext, action, true)
+  return AuthorizationResult.ALLOWED
+}
+
+val allowLiterals = matchingResources(
+  principal, host, op, AclPermissionType.ALLOW, resourceType, 
PatternType.LITERAL)
+val allowPrefixes = matchingResources(
+  principal, host, op, AclPermissionType.ALLOW, resourceType, 
PatternType.PREFIXED)
+val denyPrefixes = matchingResources(
+  principal, host, op, AclPermissionType.DENY, resourceType, 
PatternType.PREFIXED)
+
+if (allowAny(allowLiterals, allowPrefixes, denyLiterals, denyPrefixes)) {
+  logAuditMessage(requestContext, action, true)
+  return AuthorizationResult.ALLOWED
+}
+
+logAuditMessage(requestContext, action, false)
+AuthorizationResult.DENIED
+  }
+
+  def matchingResources(principal: String, host: String, op: AclOperation, 
permission: AclPermissionType,
+resourceType: ResourceType, patternType: PatternType): 
List[mutable.HashSet[String]] = {

Review comment:
   Because we won't reconstruct a new large set containing all the matching 
resources. We are constructing a List of  ~ 3 * 3 * 3 elements, which refers to 
existing HashSets maintaining by `updateCache`.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] d8tltanc commented on a change in pull request #9485: KAKFA-10619: Idempotent producer will get authorized once it has a WRITE access to at least one topic

2020-12-12 Thread GitBox


d8tltanc commented on a change in pull request #9485:
URL: https://github.com/apache/kafka/pull/9485#discussion_r541779975



##
File path: 
core/src/test/scala/unit/kafka/security/authorizer/AclAuthorizerTest.scala
##
@@ -1040,19 +1117,24 @@ class AclAuthorizerTest extends ZooKeeperTestHarness {
   securityProtocol, ClientInformation.EMPTY, false)
   }
 
-  private def authorize(authorizer: AclAuthorizer, requestContext: 
RequestContext, operation: AclOperation, resource: ResourcePattern): Boolean = {
+  private def authorize(authorizer: Authorizer, requestContext: 
RequestContext, operation: AclOperation, resource: ResourcePattern): Boolean = {
 val action = new Action(operation, resource, 1, true, true)
 authorizer.authorize(requestContext, List(action).asJava).asScala.head == 
AuthorizationResult.ALLOWED
   }
 
-  private def addAcls(authorizer: AclAuthorizer, aces: 
Set[AccessControlEntry], resourcePattern: ResourcePattern): Unit = {
+  private def authorizeByResourceType(authorizer: Authorizer, requestContext: 
RequestContext, operation: AclOperation, resourceType: ResourceType) : Boolean 
= {
+authorizer.authorizeByResourceType(requestContext, operation, 
resourceType) == AuthorizationResult.ALLOWED
+  }
+
+  private def addAcls(authorizer: Authorizer, aces: Set[AccessControlEntry], 
resourcePattern: ResourcePattern): Unit = {
 val bindings = aces.map { ace => new AclBinding(resourcePattern, ace) }
 authorizer.createAcls(requestContext, bindings.toList.asJava).asScala
   .map(_.toCompletableFuture.get)
   .foreach { result => result.exception.ifPresent { e => throw e } }
+aclAdded += Tuple3(authorizer, aces, resourcePattern)
   }
 
-  private def removeAcls(authorizer: AclAuthorizer, aces: 
Set[AccessControlEntry], resourcePattern: ResourcePattern): Boolean = {
+  private def removeAcls(authorizer: Authorizer, aces: 
Set[AccessControlEntry], resourcePattern: ResourcePattern): Boolean = {

Review comment:
   Reverted other test changes
   commit 4f9b79a810c4da3030fe262d4bfdc97df4945e8c





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] d8tltanc commented on a change in pull request #9485: KAKFA-10619: Idempotent producer will get authorized once it has a WRITE access to at least one topic

2020-12-12 Thread GitBox


d8tltanc commented on a change in pull request #9485:
URL: https://github.com/apache/kafka/pull/9485#discussion_r539787542



##
File path: core/src/main/scala/kafka/security/authorizer/AclAuthorizer.scala
##
@@ -304,6 +308,105 @@ class AclAuthorizer extends Authorizer with Logging {
 if (zkClient != null) zkClient.close()
   }
 
+  // TODO: 1. Discuss how to log audit message
+  // TODO: 2. Discuss if we need a trie to optimize(mainly for the O(n^2) loop 
but I think
+  //  in most of the cases it would be O(1) because denyDominatePrefixAllow 
should be rare
+  override def authorizeByResourceType(requestContext: 
AuthorizableRequestContext,
+   op: AclOperation,
+   resourceType: ResourceType): 
AuthorizationResult = {
+SecurityUtils.authorizeByResourceTypeCheckArgs(op, resourceType)
+
+val principal = new KafkaPrincipal(
+  requestContext.principal().getPrincipalType,
+  requestContext.principal().getName).toString
+val host = requestContext.clientAddress().getHostAddress
+val action = new Action(op, new ResourcePattern(resourceType, "NONE", 
PatternType.UNKNOWN), 0, true, true)
+
+val denyLiterals = matchingResources(
+  principal, host, op, AclPermissionType.DENY, resourceType, 
PatternType.LITERAL)
+
+if (denyAll(denyLiterals)) {
+  logAuditMessage(requestContext, action, false)
+  return AuthorizationResult.DENIED
+}
+
+if (shouldAllowEveryoneIfNoAclIsFound) {
+  logAuditMessage(requestContext, action, true)
+  return AuthorizationResult.ALLOWED
+}
+
+val allowLiterals = matchingResources(
+  principal, host, op, AclPermissionType.ALLOW, resourceType, 
PatternType.LITERAL)
+val allowPrefixes = matchingResources(
+  principal, host, op, AclPermissionType.ALLOW, resourceType, 
PatternType.PREFIXED)
+val denyPrefixes = matchingResources(
+  principal, host, op, AclPermissionType.DENY, resourceType, 
PatternType.PREFIXED)
+
+if (allowAny(allowLiterals, allowPrefixes, denyLiterals, denyPrefixes)) {
+  logAuditMessage(requestContext, action, true)
+  return AuthorizationResult.ALLOWED
+}
+
+logAuditMessage(requestContext, action, false)
+AuthorizationResult.DENIED
+  }
+
+  def matchingResources(principal: String, host: String, op: AclOperation, 
permission: AclPermissionType,
+resourceType: ResourceType, patternType: PatternType): 
List[mutable.HashSet[String]] = {
+var matched = List[mutable.HashSet[String]]()
+for (p <- Set(principal, AclEntry.WildcardPrincipal.toString)) {
+  for (h <- Set(host, AclEntry.WildcardHost)) {
+for (o <- Set(op, AclOperation.ALL)) {
+  val ace = new AccessControlEntry(p, h, o, permission)
+  val resourceIndex = new ResourceIndex(ace, resourceType, patternType)
+  resourceCache.get(resourceIndex) match {
+case Some(resources) => matched = matched :+ resources
+case None =>
+  }
+}
+  }
+}
+matched
+  }
+
+  def denyAll(denyLiterals: List[mutable.HashSet[String]]): Boolean =
+denyLiterals.exists(r => r.contains(ResourcePattern.WILDCARD_RESOURCE))
+
+
+  private def allowAny(allowLiterals: List[mutable.Set[String]], 
allowPrefixes: List[mutable.Set[String]],
+   denyLiterals: List[mutable.Set[String]], denyPrefixes: 
List[mutable.Set[String]]): Boolean = {
+(allowPrefixes.exists(prefixes =>
+  prefixes.exists(prefix => allowPrefix(prefix, denyPrefixes)))
+  || allowLiterals.exists(literals =>
+literals.exists(literal => allowLiteral(literal, denyLiterals, 
denyPrefixes
+  }
+
+  private def allowLiteral(literalName: String,
+   denyLiterals: List[mutable.Set[String]], 
denyPrefixes: List[mutable.Set[String]]): Boolean = {
+literalName match{
+  case ResourcePattern.WILDCARD_RESOURCE => true
+  case _ => (denyLiterals.forall(denyLiterals => 
!denyLiterals.contains(literalName))
+&& !hasDominantPrefixedDeny(literalName, denyPrefixes))
+}
+  }
+
+  private def allowPrefix(prefixName: String,
+  denyPrefixes: List[mutable.Set[String]]): Boolean = {
+!hasDominantPrefixedDeny(prefixName, denyPrefixes)
+  }
+
+  private def hasDominantPrefixedDeny(resourceName: String, denyPrefixes: 
List[mutable.Set[String]]): Boolean = {

Review comment:
   I was trying to share it but it seems like the different collection type 
btw java and scala is a headache. We'll then need some java converters or 
instantiate a java collection in the scala code. Do you think it deserves this?





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this 

[GitHub] [kafka] d8tltanc commented on a change in pull request #9485: KAKFA-10619: Idempotent producer will get authorized once it has a WRITE access to at least one topic

2020-12-12 Thread GitBox


d8tltanc commented on a change in pull request #9485:
URL: https://github.com/apache/kafka/pull/9485#discussion_r539782093



##
File path: core/src/main/scala/kafka/security/authorizer/AclAuthorizer.scala
##
@@ -304,6 +308,105 @@ class AclAuthorizer extends Authorizer with Logging {
 if (zkClient != null) zkClient.close()
   }
 
+  // TODO: 1. Discuss how to log audit message
+  // TODO: 2. Discuss if we need a trie to optimize(mainly for the O(n^2) loop 
but I think
+  //  in most of the cases it would be O(1) because denyDominatePrefixAllow 
should be rare
+  override def authorizeByResourceType(requestContext: 
AuthorizableRequestContext,
+   op: AclOperation,
+   resourceType: ResourceType): 
AuthorizationResult = {
+SecurityUtils.authorizeByResourceTypeCheckArgs(op, resourceType)
+
+val principal = new KafkaPrincipal(
+  requestContext.principal().getPrincipalType,
+  requestContext.principal().getName).toString
+val host = requestContext.clientAddress().getHostAddress
+val action = new Action(op, new ResourcePattern(resourceType, "NONE", 
PatternType.UNKNOWN), 0, true, true)
+
+val denyLiterals = matchingResources(
+  principal, host, op, AclPermissionType.DENY, resourceType, 
PatternType.LITERAL)
+
+if (denyAll(denyLiterals)) {
+  logAuditMessage(requestContext, action, false)
+  return AuthorizationResult.DENIED
+}
+
+if (shouldAllowEveryoneIfNoAclIsFound) {
+  logAuditMessage(requestContext, action, true)
+  return AuthorizationResult.ALLOWED
+}
+
+val allowLiterals = matchingResources(
+  principal, host, op, AclPermissionType.ALLOW, resourceType, 
PatternType.LITERAL)
+val allowPrefixes = matchingResources(
+  principal, host, op, AclPermissionType.ALLOW, resourceType, 
PatternType.PREFIXED)
+val denyPrefixes = matchingResources(
+  principal, host, op, AclPermissionType.DENY, resourceType, 
PatternType.PREFIXED)
+
+if (allowAny(allowLiterals, allowPrefixes, denyLiterals, denyPrefixes)) {
+  logAuditMessage(requestContext, action, true)
+  return AuthorizationResult.ALLOWED
+}
+
+logAuditMessage(requestContext, action, false)
+AuthorizationResult.DENIED
+  }
+
+  def matchingResources(principal: String, host: String, op: AclOperation, 
permission: AclPermissionType,
+resourceType: ResourceType, patternType: PatternType): 
List[mutable.HashSet[String]] = {

Review comment:
   I think that we'd want to prevent constructing extremely large hash sets 
and keep the size in a threshold < 100k, in order to prevent the long wait for 
resizing. What do you think?





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] d8tltanc commented on a change in pull request #9485: KAKFA-10619: Idempotent producer will get authorized once it has a WRITE access to at least one topic

2020-12-12 Thread GitBox


d8tltanc commented on a change in pull request #9485:
URL: https://github.com/apache/kafka/pull/9485#discussion_r541212190



##
File path: 
jmh-benchmarks/src/main/java/org/apache/kafka/jmh/acl/AclAuthorizerBenchmark.java
##
@@ -105,49 +120,76 @@ private void setFieldValue(Object obj, String fieldName, 
Object value) throws Ex
 field.set(obj, value);
 }
 
-private TreeMap prepareAclCache() {
+private void prepareAclCache() throws UnknownHostException {
 Map> aclEntries = new HashMap<>();
 for (int resourceId = 0; resourceId < resourceCount; resourceId++) {
 ResourcePattern resource = new ResourcePattern(
 (resourceId % 10 == 0) ? ResourceType.GROUP : 
ResourceType.TOPIC,
-resourceNamePrefix + resourceId,
+resourceName(resourceNamePrefix),

Review comment:
   The existing benchmark does not have any DENY resource in it. Adding 
some DENY bindings whose percentage is controlled by parameters will be an 
improvement to the existing benchmark and help us understand the performance 
better. 
   
   I've reverted all changes other than adding some DENY bindings. Also, I 
moved those memory intense operations into the @setup phase so now the 
benchmark just measures updateCache(). Does the benchmark look good to you now?
   
   commit 6536cea788210860a764f3f0a6901244e8d974fe
   
   
   @rajinisivaram





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] d8tltanc commented on a change in pull request #9485: KAKFA-10619: Idempotent producer will get authorized once it has a WRITE access to at least one topic

2020-12-12 Thread GitBox


d8tltanc commented on a change in pull request #9485:
URL: https://github.com/apache/kafka/pull/9485#discussion_r541212190



##
File path: 
jmh-benchmarks/src/main/java/org/apache/kafka/jmh/acl/AclAuthorizerBenchmark.java
##
@@ -105,49 +120,76 @@ private void setFieldValue(Object obj, String fieldName, 
Object value) throws Ex
 field.set(obj, value);
 }
 
-private TreeMap prepareAclCache() {
+private void prepareAclCache() throws UnknownHostException {
 Map> aclEntries = new HashMap<>();
 for (int resourceId = 0; resourceId < resourceCount; resourceId++) {
 ResourcePattern resource = new ResourcePattern(
 (resourceId % 10 == 0) ? ResourceType.GROUP : 
ResourceType.TOPIC,
-resourceNamePrefix + resourceId,
+resourceName(resourceNamePrefix),

Review comment:
   The existing benchmark does not have any DENY resource in it. Adding 
some DENY bindings whose percentage is controlled by parameters will be an 
improvement to the existing benchmark and help us understand the performance 
better. 
   
   I've reverted all changes other than adding some DENY bindings. Does the 
benchmark look good to you now?
   
   commit 6536cea788210860a764f3f0a6901244e8d974fe
   
   
   @rajinisivaram





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] d8tltanc commented on a change in pull request #9485: KAKFA-10619: Idempotent producer will get authorized once it has a WRITE access to at least one topic

2020-12-12 Thread GitBox


d8tltanc commented on a change in pull request #9485:
URL: https://github.com/apache/kafka/pull/9485#discussion_r541212190



##
File path: 
jmh-benchmarks/src/main/java/org/apache/kafka/jmh/acl/AclAuthorizerBenchmark.java
##
@@ -105,49 +120,76 @@ private void setFieldValue(Object obj, String fieldName, 
Object value) throws Ex
 field.set(obj, value);
 }
 
-private TreeMap prepareAclCache() {
+private void prepareAclCache() throws UnknownHostException {
 Map> aclEntries = new HashMap<>();
 for (int resourceId = 0; resourceId < resourceCount; resourceId++) {
 ResourcePattern resource = new ResourcePattern(
 (resourceId % 10 == 0) ? ResourceType.GROUP : 
ResourceType.TOPIC,
-resourceNamePrefix + resourceId,
+resourceName(resourceNamePrefix),

Review comment:
   The existing benchmark does not have any DENY resource in it. Adding 
some DENY bindings whose percentage is controlled by parameters will be an 
improvement to the existing benchmark and help us understand the performance 
better. 
   
   I've reverted all changes other than adding some DENY bindings. Does the 
benchmark look good to you now?
   @rajinisivaram





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] d8tltanc commented on a change in pull request #9485: KAKFA-10619: Idempotent producer will get authorized once it has a WRITE access to at least one topic

2020-12-12 Thread GitBox


d8tltanc commented on a change in pull request #9485:
URL: https://github.com/apache/kafka/pull/9485#discussion_r541212190



##
File path: 
jmh-benchmarks/src/main/java/org/apache/kafka/jmh/acl/AclAuthorizerBenchmark.java
##
@@ -105,49 +120,76 @@ private void setFieldValue(Object obj, String fieldName, 
Object value) throws Ex
 field.set(obj, value);
 }
 
-private TreeMap prepareAclCache() {
+private void prepareAclCache() throws UnknownHostException {
 Map> aclEntries = new HashMap<>();
 for (int resourceId = 0; resourceId < resourceCount; resourceId++) {
 ResourcePattern resource = new ResourcePattern(
 (resourceId % 10 == 0) ? ResourceType.GROUP : 
ResourceType.TOPIC,
-resourceNamePrefix + resourceId,
+resourceName(resourceNamePrefix),

Review comment:
   The existing benchmark does not have any DENY resource in it. Adding 
some DENY resources whose percentage is controlled by parameters will be an 
improvement to the existing benchmark and help us understand the performance 
better. 
   
   I've reverted all changes other than adding some DENY bindings. Does the 
benchmark look good to you now?
   @rajinisivaram





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] d8tltanc commented on a change in pull request #9485: KAKFA-10619: Idempotent producer will get authorized once it has a WRITE access to at least one topic

2020-12-12 Thread GitBox


d8tltanc commented on a change in pull request #9485:
URL: https://github.com/apache/kafka/pull/9485#discussion_r541212190



##
File path: 
jmh-benchmarks/src/main/java/org/apache/kafka/jmh/acl/AclAuthorizerBenchmark.java
##
@@ -105,49 +120,76 @@ private void setFieldValue(Object obj, String fieldName, 
Object value) throws Ex
 field.set(obj, value);
 }
 
-private TreeMap prepareAclCache() {
+private void prepareAclCache() throws UnknownHostException {
 Map> aclEntries = new HashMap<>();
 for (int resourceId = 0; resourceId < resourceCount; resourceId++) {
 ResourcePattern resource = new ResourcePattern(
 (resourceId % 10 == 0) ? ResourceType.GROUP : 
ResourceType.TOPIC,
-resourceNamePrefix + resourceId,
+resourceName(resourceNamePrefix),

Review comment:
   The existing benchmark does not have any DENY resource in it. Adding 
some DENY resources whose percentage is controlled by parameters will be an 
improvement to the existing benchmark and help us understand the performance 
better. 
   
   I've reverted all changes other than adding some DENY resource. Does the 
benchmark look good to you now?
   @rajinisivaram





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] d8tltanc commented on a change in pull request #9485: KAKFA-10619: Idempotent producer will get authorized once it has a WRITE access to at least one topic

2020-12-11 Thread GitBox


d8tltanc commented on a change in pull request #9485:
URL: https://github.com/apache/kafka/pull/9485#discussion_r541212190



##
File path: 
jmh-benchmarks/src/main/java/org/apache/kafka/jmh/acl/AclAuthorizerBenchmark.java
##
@@ -105,49 +120,76 @@ private void setFieldValue(Object obj, String fieldName, 
Object value) throws Ex
 field.set(obj, value);
 }
 
-private TreeMap prepareAclCache() {
+private void prepareAclCache() throws UnknownHostException {
 Map> aclEntries = new HashMap<>();
 for (int resourceId = 0; resourceId < resourceCount; resourceId++) {
 ResourcePattern resource = new ResourcePattern(
 (resourceId % 10 == 0) ? ResourceType.GROUP : 
ResourceType.TOPIC,
-resourceNamePrefix + resourceId,
+resourceName(resourceNamePrefix),

Review comment:
   For this line, since the benchmark is creating the random resource name 
in several places, I'm trying to build a unified way to build it.
   
   The existing benchmark does not have any DENY resource in it. Adding some 
DENY resources whose percentage is controlled by parameters will be an 
improvement to the existing benchmark and help us understand the performance 
better. In addition, I think adding different types of patterns would also 
help. 
   
   What do you think? @rajinisivaram





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] d8tltanc commented on a change in pull request #9485: KAKFA-10619: Idempotent producer will get authorized once it has a WRITE access to at least one topic

2020-12-11 Thread GitBox


d8tltanc commented on a change in pull request #9485:
URL: https://github.com/apache/kafka/pull/9485#discussion_r541212190



##
File path: 
jmh-benchmarks/src/main/java/org/apache/kafka/jmh/acl/AclAuthorizerBenchmark.java
##
@@ -105,49 +120,76 @@ private void setFieldValue(Object obj, String fieldName, 
Object value) throws Ex
 field.set(obj, value);
 }
 
-private TreeMap prepareAclCache() {
+private void prepareAclCache() throws UnknownHostException {
 Map> aclEntries = new HashMap<>();
 for (int resourceId = 0; resourceId < resourceCount; resourceId++) {
 ResourcePattern resource = new ResourcePattern(
 (resourceId % 10 == 0) ? ResourceType.GROUP : 
ResourceType.TOPIC,
-resourceNamePrefix + resourceId,
+resourceName(resourceNamePrefix),

Review comment:
   For this line, since the benchmark is creating the random resource name 
in several places, I'm trying to build a unified way to build it.
   
   I think the existing benchmark does not have any DENY resource in it. Adding 
some DENY resources whose percentage is controlled by parameters will be an 
improvement to the existing benchmark and help us understand the performance 
better. In addition, I think adding different types of patterns would also 
help. 
   
   What do you think? @rajinisivaram





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] d8tltanc commented on a change in pull request #9485: KAKFA-10619: Idempotent producer will get authorized once it has a WRITE access to at least one topic

2020-12-11 Thread GitBox


d8tltanc commented on a change in pull request #9485:
URL: https://github.com/apache/kafka/pull/9485#discussion_r541206648



##
File path: 
jmh-benchmarks/src/main/java/org/apache/kafka/jmh/acl/AclAuthorizerBenchmark.java
##
@@ -69,33 +73,44 @@
 @BenchmarkMode(Mode.AverageTime)
 @OutputTimeUnit(TimeUnit.MILLISECONDS)
 public class AclAuthorizerBenchmark {
-@Param({"1", "5", "20"})
+@Param({"20"})

Review comment:
   Yes





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] d8tltanc commented on a change in pull request #9485: KAKFA-10619: Idempotent producer will get authorized once it has a WRITE access to at least one topic

2020-12-11 Thread GitBox


d8tltanc commented on a change in pull request #9485:
URL: https://github.com/apache/kafka/pull/9485#discussion_r541206048



##
File path: 
jmh-benchmarks/src/main/java/org/apache/kafka/jmh/acl/AclAuthorizerBenchmark.java
##
@@ -69,33 +73,44 @@
 @BenchmarkMode(Mode.AverageTime)
 @OutputTimeUnit(TimeUnit.MILLISECONDS)
 public class AclAuthorizerBenchmark {
-@Param({"1", "5", "20"})
+@Param({"20"})
 private int resourceCount;
 //no. of. rules per resource
-@Param({"10", "50"})
+@Param({"50"})

Review comment:
   Yes.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] d8tltanc commented on a change in pull request #9485: KAKFA-10619: Idempotent producer will get authorized once it has a WRITE access to at least one topic

2020-12-11 Thread GitBox


d8tltanc commented on a change in pull request #9485:
URL: https://github.com/apache/kafka/pull/9485#discussion_r541202572



##
File path: 
jmh-benchmarks/src/main/java/org/apache/kafka/jmh/acl/AclAuthorizerBenchmark.java
##
@@ -69,33 +73,44 @@
 @BenchmarkMode(Mode.AverageTime)
 @OutputTimeUnit(TimeUnit.MILLISECONDS)
 public class AclAuthorizerBenchmark {
-@Param({"1", "5", "20"})
+@Param({"20"})
 private int resourceCount;
 //no. of. rules per resource
-@Param({"10", "50"})
+@Param({"50"})
 private int aclCount;
 
+@Param({"0", "20", "50", "90", "99", "99.9", "99.99", "100"})
+private double denyPercentage;
+
 private final int hostPreCount = 1000;
 private final String resourceNamePrefix = "foo-bar35_resource-";
+private final String resourceName = resourceNamePrefix + 95;
 
 private final AclAuthorizer aclAuthorizer = new AclAuthorizer();
 private final KafkaPrincipal principal = new 
KafkaPrincipal(KafkaPrincipal.USER_TYPE, "test-user");
 private List actions = new ArrayList<>();
 private RequestContext context;
 
+private TreeMap aclCache = new 
TreeMap<>(new AclAuthorizer.ResourceOrdering());
+private scala.collection.mutable.HashMap> resourceCache =
+new scala.collection.mutable.HashMap<>();
+Random rand = new Random(System.currentTimeMillis());
+double eps = 1e-9;
+
 @Setup(Level.Trial)
 public void setup() throws Exception {
-setFieldValue(aclAuthorizer, 
AclAuthorizer.class.getDeclaredField("aclCache").getName(),
-prepareAclCache());
+prepareAclCache();
+setFieldValue(aclAuthorizer, 
AclAuthorizer.class.getDeclaredField("aclCache").getName(), aclCache);
+setFieldValue(aclAuthorizer, 
AclAuthorizer.class.getDeclaredField("resourceCache").getName(), resourceCache);
 // By adding `-95` to the resource name prefix, we cause the 
`TreeMap.from/to` call to return
 // most map entries. In such cases, we rely on the filtering based on 
`String.startsWith`
 // to return the matching ACLs. Using a more efficient data structure 
(e.g. a prefix
 // tree) should improve performance significantly).
 actions = Collections.singletonList(new Action(AclOperation.WRITE,
-new ResourcePattern(ResourceType.TOPIC, resourceNamePrefix + 95, 
PatternType.LITERAL),
+new ResourcePattern(ResourceType.TOPIC, resourceName, 
PatternType.LITERAL),

Review comment:
   Yeah. I was doing resourceName = resourceName + 95 to re-use this 
variable. We can revert it.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] d8tltanc commented on a change in pull request #9485: KAKFA-10619: Idempotent producer will get authorized once it has a WRITE access to at least one topic

2020-12-10 Thread GitBox


d8tltanc commented on a change in pull request #9485:
URL: https://github.com/apache/kafka/pull/9485#discussion_r540706117



##
File path: 
core/src/test/scala/unit/kafka/security/authorizer/AclAuthorizerTest.scala
##
@@ -1040,19 +1117,24 @@ class AclAuthorizerTest extends ZooKeeperTestHarness {
   securityProtocol, ClientInformation.EMPTY, false)
   }
 
-  private def authorize(authorizer: AclAuthorizer, requestContext: 
RequestContext, operation: AclOperation, resource: ResourcePattern): Boolean = {
+  private def authorize(authorizer: Authorizer, requestContext: 
RequestContext, operation: AclOperation, resource: ResourcePattern): Boolean = {
 val action = new Action(operation, resource, 1, true, true)
 authorizer.authorize(requestContext, List(action).asJava).asScala.head == 
AuthorizationResult.ALLOWED
   }
 
-  private def addAcls(authorizer: AclAuthorizer, aces: 
Set[AccessControlEntry], resourcePattern: ResourcePattern): Unit = {
+  private def authorizeByResourceType(authorizer: Authorizer, requestContext: 
RequestContext, operation: AclOperation, resourceType: ResourceType) : Boolean 
= {
+authorizer.authorizeByResourceType(requestContext, operation, 
resourceType) == AuthorizationResult.ALLOWED
+  }
+
+  private def addAcls(authorizer: Authorizer, aces: Set[AccessControlEntry], 
resourcePattern: ResourcePattern): Unit = {
 val bindings = aces.map { ace => new AclBinding(resourcePattern, ace) }
 authorizer.createAcls(requestContext, bindings.toList.asJava).asScala
   .map(_.toCompletableFuture.get)
   .foreach { result => result.exception.ifPresent { e => throw e } }
+aclAdded += Tuple3(authorizer, aces, resourcePattern)
   }
 
-  private def removeAcls(authorizer: AclAuthorizer, aces: 
Set[AccessControlEntry], resourcePattern: ResourcePattern): Boolean = {
+  private def removeAcls(authorizer: Authorizer, aces: 
Set[AccessControlEntry], resourcePattern: ResourcePattern): Boolean = {

Review comment:
   Since `AuthorizerInterfaceDefaultTest`, `AclAuthorizerTest`, and 
`AuthorizerWrapperTest` are sharing some test utils, we need to make this 
method signature abstract a bit, in order to make it usable by 
AuthorizerTestFactory.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] d8tltanc commented on a change in pull request #9485: KAKFA-10619: Idempotent producer will get authorized once it has a WRITE access to at least one topic

2020-12-10 Thread GitBox


d8tltanc commented on a change in pull request #9485:
URL: https://github.com/apache/kafka/pull/9485#discussion_r540705023



##
File path: 
core/src/test/scala/unit/kafka/security/authorizer/AclAuthorizerTest.scala
##
@@ -100,8 +106,15 @@ class AclAuthorizerTest extends ZooKeeperTestHarness {
 
   @After
   override def tearDown(): Unit = {
-aclAuthorizer.close()
-aclAuthorizer2.close()
+val authorizers = Seq(aclAuthorizer, aclAuthorizer2)
+authorizers.foreach(a => {
+  a.acls(AclBindingFilter.ANY).forEach(bd => {
+removeAcls(aclAuthorizer, Set(bd.entry), bd.pattern())

Review comment:
   Otherwise "deny all" will remain in ZK during the whole test process 
since ZK won't be restarted or re-instantiated.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] d8tltanc commented on a change in pull request #9485: KAKFA-10619: Idempotent producer will get authorized once it has a WRITE access to at least one topic

2020-12-10 Thread GitBox


d8tltanc commented on a change in pull request #9485:
URL: https://github.com/apache/kafka/pull/9485#discussion_r540658692



##
File path: core/src/main/scala/kafka/security/authorizer/AuthorizerWrapper.scala
##
@@ -175,4 +181,32 @@ class AuthorizerWrapper(private[kafka] val baseAuthorizer: 
kafka.security.auth.A
   override def close(): Unit = {
 baseAuthorizer.close()
   }
+
+  override def authorizeByResourceType(requestContext: 
AuthorizableRequestContext,
+   op: AclOperation,
+   resourceType: ResourceType): 
AuthorizationResult = {
+SecurityUtils.authorizeByResourceTypeCheckArgs(op, resourceType)
+
+if (denyAllResource(requestContext, op, resourceType)) {
+  AuthorizationResult.DENIED
+} else if (shouldAllowEveryoneIfNoAclIsFound) {
+  AuthorizationResult.ALLOWED
+} else {
+  super.authorizeByResourceType(requestContext, op, resourceType)
+}
+  }
+
+  private def denyAllResource(requestContext: AuthorizableRequestContext,
+  op: AclOperation,
+  resourceType: ResourceType): Boolean = {

Review comment:
   commit 8263bd3 changed the AuthorizerWrapper logic and optimized the 
performance a bit.
   
   Now AuthorizerWrapper#denyAllResource will 
   1. only use Authorizer#acls() to filter out the `WildcardResource` with the 
pattern type `LITERAL`. 
   2. check if any of the filtered out bindings match the `request principle` 
and `request host`. 
   
   So it's behavior diverges more from the interface default now.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] d8tltanc commented on a change in pull request #9485: KAKFA-10619: Idempotent producer will get authorized once it has a WRITE access to at least one topic

2020-12-10 Thread GitBox


d8tltanc commented on a change in pull request #9485:
URL: https://github.com/apache/kafka/pull/9485#discussion_r540655651



##
File path: core/src/main/scala/kafka/security/authorizer/AuthorizerWrapper.scala
##
@@ -175,4 +181,32 @@ class AuthorizerWrapper(private[kafka] val baseAuthorizer: 
kafka.security.auth.A
   override def close(): Unit = {
 baseAuthorizer.close()
   }
+
+  override def authorizeByResourceType(requestContext: 
AuthorizableRequestContext,
+   op: AclOperation,
+   resourceType: ResourceType): 
AuthorizationResult = {
+SecurityUtils.authorizeByResourceTypeCheckArgs(op, resourceType)
+
+if (denyAllResource(requestContext, op, resourceType)) {
+  AuthorizationResult.DENIED
+} else if (shouldAllowEveryoneIfNoAclIsFound) {
+  AuthorizationResult.ALLOWED
+} else {
+  super.authorizeByResourceType(requestContext, op, resourceType)
+}
+  }
+
+  private def denyAllResource(requestContext: AuthorizableRequestContext,
+  op: AclOperation,
+  resourceType: ResourceType): Boolean = {
+val resourceTypeFilter = new ResourcePatternFilter(
+  resourceType, null, PatternType.ANY)
+val principal = new 
KafkaPrincipal(requestContext.principal.getPrincipalType, 
requestContext.principal.getName)
+val accessControlEntry = new AccessControlEntryFilter(
+  principal.toString, requestContext.clientAddress().getHostAddress, op, 
AclPermissionType.DENY)

Review comment:
   Good catch. commit 8263bd319f63d39808f90129db55427b98385dd4
   Since it's a bit hard to test `AllowAnyoneIfNoAclFound` and many other 
logics in the integration test, I added a new test class 
`AuthorizerWrapperTest`.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] d8tltanc commented on a change in pull request #9485: KAKFA-10619: Idempotent producer will get authorized once it has a WRITE access to at least one topic

2020-12-10 Thread GitBox


d8tltanc commented on a change in pull request #9485:
URL: https://github.com/apache/kafka/pull/9485#discussion_r540655651



##
File path: core/src/main/scala/kafka/security/authorizer/AuthorizerWrapper.scala
##
@@ -175,4 +181,32 @@ class AuthorizerWrapper(private[kafka] val baseAuthorizer: 
kafka.security.auth.A
   override def close(): Unit = {
 baseAuthorizer.close()
   }
+
+  override def authorizeByResourceType(requestContext: 
AuthorizableRequestContext,
+   op: AclOperation,
+   resourceType: ResourceType): 
AuthorizationResult = {
+SecurityUtils.authorizeByResourceTypeCheckArgs(op, resourceType)
+
+if (denyAllResource(requestContext, op, resourceType)) {
+  AuthorizationResult.DENIED
+} else if (shouldAllowEveryoneIfNoAclIsFound) {
+  AuthorizationResult.ALLOWED
+} else {
+  super.authorizeByResourceType(requestContext, op, resourceType)
+}
+  }
+
+  private def denyAllResource(requestContext: AuthorizableRequestContext,
+  op: AclOperation,
+  resourceType: ResourceType): Boolean = {
+val resourceTypeFilter = new ResourcePatternFilter(
+  resourceType, null, PatternType.ANY)
+val principal = new 
KafkaPrincipal(requestContext.principal.getPrincipalType, 
requestContext.principal.getName)
+val accessControlEntry = new AccessControlEntryFilter(
+  principal.toString, requestContext.clientAddress().getHostAddress, op, 
AclPermissionType.DENY)

Review comment:
   Good catch. commit 83d1f8840933a28934f261ca6affc820a8ececd5
   Since it's a bit hard to test `AllowAnyoneIfNoAclFound` and many other 
logics in the integration test, I added a new test class 
`AuthorizerWrapperTest`.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] d8tltanc commented on a change in pull request #9485: KAKFA-10619: Idempotent producer will get authorized once it has a WRITE access to at least one topic

2020-12-10 Thread GitBox


d8tltanc commented on a change in pull request #9485:
URL: https://github.com/apache/kafka/pull/9485#discussion_r540418386



##
File path: core/src/main/scala/kafka/security/authorizer/AuthorizerWrapper.scala
##
@@ -71,15 +73,19 @@ object AuthorizerWrapper {
   }
 
   def convertToResource(resourcePattern: ResourcePattern): Resource = {
-Resource(ResourceType.fromJava(resourcePattern.resourceType), 
resourcePattern.name, resourcePattern.patternType)
+Resource(ResourceTypeLegacy.fromJava(resourcePattern.resourceType), 
resourcePattern.name, resourcePattern.patternType)
   }
 }
 
 @deprecated("Use kafka.security.authorizer.AclAuthorizer", "Since 2.5")
 class AuthorizerWrapper(private[kafka] val baseAuthorizer: 
kafka.security.auth.Authorizer) extends Authorizer {
 
+  var shouldAllowEveryoneIfNoAclIsFound = false
+
   override def configure(configs: util.Map[String, _]): Unit = {
 baseAuthorizer.configure(configs)
+shouldAllowEveryoneIfNoAclIsFound = configs.asScala.get(
+  
AclAuthorizer.AllowEveryoneIfNoAclIsFoundProp).exists(_.toString.toBoolean)

Review comment:
   Given that we probably don't want to change the deprecated Authorizer 
interface, I can only think of one way to achieve this:
   
   Besides checking if the `AllowEveryoneIfNoAclIsFoundProp` exists and if it 
equals to `true`, I added another check to authorize on a hardcoded session, 
operation, and resource.
   
   Since configure() will be called immediately after the authorizer 
instantiation, it's guaranteed that no ACLs would exist when we do this check. 
   
 override def configure(configs: util.Map[String, _]): Unit = {
   baseAuthorizer.configure(configs)
   shouldAllowEveryoneIfNoAclIsFound = (configs.asScala.get(
   
AclAuthorizer.AllowEveryoneIfNoAclIsFoundProp).exists(_.toString.toBoolean)
 && baseAuthorizer.authorize(
   new Session(KafkaPrincipal.ANONYMOUS, 
InetAddress.getByName("1.2.3.4")),
   Read, new Resource(Topic, "hi", PatternType.LITERAL)))
 }
   
   commit 2ed79a0a7788f8841475badfd1c26adf0eb3435c





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] d8tltanc commented on a change in pull request #9485: KAKFA-10619: Idempotent producer will get authorized once it has a WRITE access to at least one topic

2020-12-09 Thread GitBox


d8tltanc commented on a change in pull request #9485:
URL: https://github.com/apache/kafka/pull/9485#discussion_r539787542



##
File path: core/src/main/scala/kafka/security/authorizer/AclAuthorizer.scala
##
@@ -304,6 +308,105 @@ class AclAuthorizer extends Authorizer with Logging {
 if (zkClient != null) zkClient.close()
   }
 
+  // TODO: 1. Discuss how to log audit message
+  // TODO: 2. Discuss if we need a trie to optimize(mainly for the O(n^2) loop 
but I think
+  //  in most of the cases it would be O(1) because denyDominatePrefixAllow 
should be rare
+  override def authorizeByResourceType(requestContext: 
AuthorizableRequestContext,
+   op: AclOperation,
+   resourceType: ResourceType): 
AuthorizationResult = {
+SecurityUtils.authorizeByResourceTypeCheckArgs(op, resourceType)
+
+val principal = new KafkaPrincipal(
+  requestContext.principal().getPrincipalType,
+  requestContext.principal().getName).toString
+val host = requestContext.clientAddress().getHostAddress
+val action = new Action(op, new ResourcePattern(resourceType, "NONE", 
PatternType.UNKNOWN), 0, true, true)
+
+val denyLiterals = matchingResources(
+  principal, host, op, AclPermissionType.DENY, resourceType, 
PatternType.LITERAL)
+
+if (denyAll(denyLiterals)) {
+  logAuditMessage(requestContext, action, false)
+  return AuthorizationResult.DENIED
+}
+
+if (shouldAllowEveryoneIfNoAclIsFound) {
+  logAuditMessage(requestContext, action, true)
+  return AuthorizationResult.ALLOWED
+}
+
+val allowLiterals = matchingResources(
+  principal, host, op, AclPermissionType.ALLOW, resourceType, 
PatternType.LITERAL)
+val allowPrefixes = matchingResources(
+  principal, host, op, AclPermissionType.ALLOW, resourceType, 
PatternType.PREFIXED)
+val denyPrefixes = matchingResources(
+  principal, host, op, AclPermissionType.DENY, resourceType, 
PatternType.PREFIXED)
+
+if (allowAny(allowLiterals, allowPrefixes, denyLiterals, denyPrefixes)) {
+  logAuditMessage(requestContext, action, true)
+  return AuthorizationResult.ALLOWED
+}
+
+logAuditMessage(requestContext, action, false)
+AuthorizationResult.DENIED
+  }
+
+  def matchingResources(principal: String, host: String, op: AclOperation, 
permission: AclPermissionType,
+resourceType: ResourceType, patternType: PatternType): 
List[mutable.HashSet[String]] = {
+var matched = List[mutable.HashSet[String]]()
+for (p <- Set(principal, AclEntry.WildcardPrincipal.toString)) {
+  for (h <- Set(host, AclEntry.WildcardHost)) {
+for (o <- Set(op, AclOperation.ALL)) {
+  val ace = new AccessControlEntry(p, h, o, permission)
+  val resourceIndex = new ResourceIndex(ace, resourceType, patternType)
+  resourceCache.get(resourceIndex) match {
+case Some(resources) => matched = matched :+ resources
+case None =>
+  }
+}
+  }
+}
+matched
+  }
+
+  def denyAll(denyLiterals: List[mutable.HashSet[String]]): Boolean =
+denyLiterals.exists(r => r.contains(ResourcePattern.WILDCARD_RESOURCE))
+
+
+  private def allowAny(allowLiterals: List[mutable.Set[String]], 
allowPrefixes: List[mutable.Set[String]],
+   denyLiterals: List[mutable.Set[String]], denyPrefixes: 
List[mutable.Set[String]]): Boolean = {
+(allowPrefixes.exists(prefixes =>
+  prefixes.exists(prefix => allowPrefix(prefix, denyPrefixes)))
+  || allowLiterals.exists(literals =>
+literals.exists(literal => allowLiteral(literal, denyLiterals, 
denyPrefixes
+  }
+
+  private def allowLiteral(literalName: String,
+   denyLiterals: List[mutable.Set[String]], 
denyPrefixes: List[mutable.Set[String]]): Boolean = {
+literalName match{
+  case ResourcePattern.WILDCARD_RESOURCE => true
+  case _ => (denyLiterals.forall(denyLiterals => 
!denyLiterals.contains(literalName))
+&& !hasDominantPrefixedDeny(literalName, denyPrefixes))
+}
+  }
+
+  private def allowPrefix(prefixName: String,
+  denyPrefixes: List[mutable.Set[String]]): Boolean = {
+!hasDominantPrefixedDeny(prefixName, denyPrefixes)
+  }
+
+  private def hasDominantPrefixedDeny(resourceName: String, denyPrefixes: 
List[mutable.Set[String]]): Boolean = {

Review comment:
   I was trying to share it but it seems like the different collection type 
btw java and scala is a headache. We'll then need some java converters or 
instantiate a java collection in the scala code. Do you think we do this?





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this 

[GitHub] [kafka] d8tltanc commented on a change in pull request #9485: KAKFA-10619: Idempotent producer will get authorized once it has a WRITE access to at least one topic

2020-12-09 Thread GitBox


d8tltanc commented on a change in pull request #9485:
URL: https://github.com/apache/kafka/pull/9485#discussion_r539782093



##
File path: core/src/main/scala/kafka/security/authorizer/AclAuthorizer.scala
##
@@ -304,6 +308,105 @@ class AclAuthorizer extends Authorizer with Logging {
 if (zkClient != null) zkClient.close()
   }
 
+  // TODO: 1. Discuss how to log audit message
+  // TODO: 2. Discuss if we need a trie to optimize(mainly for the O(n^2) loop 
but I think
+  //  in most of the cases it would be O(1) because denyDominatePrefixAllow 
should be rare
+  override def authorizeByResourceType(requestContext: 
AuthorizableRequestContext,
+   op: AclOperation,
+   resourceType: ResourceType): 
AuthorizationResult = {
+SecurityUtils.authorizeByResourceTypeCheckArgs(op, resourceType)
+
+val principal = new KafkaPrincipal(
+  requestContext.principal().getPrincipalType,
+  requestContext.principal().getName).toString
+val host = requestContext.clientAddress().getHostAddress
+val action = new Action(op, new ResourcePattern(resourceType, "NONE", 
PatternType.UNKNOWN), 0, true, true)
+
+val denyLiterals = matchingResources(
+  principal, host, op, AclPermissionType.DENY, resourceType, 
PatternType.LITERAL)
+
+if (denyAll(denyLiterals)) {
+  logAuditMessage(requestContext, action, false)
+  return AuthorizationResult.DENIED
+}
+
+if (shouldAllowEveryoneIfNoAclIsFound) {
+  logAuditMessage(requestContext, action, true)
+  return AuthorizationResult.ALLOWED
+}
+
+val allowLiterals = matchingResources(
+  principal, host, op, AclPermissionType.ALLOW, resourceType, 
PatternType.LITERAL)
+val allowPrefixes = matchingResources(
+  principal, host, op, AclPermissionType.ALLOW, resourceType, 
PatternType.PREFIXED)
+val denyPrefixes = matchingResources(
+  principal, host, op, AclPermissionType.DENY, resourceType, 
PatternType.PREFIXED)
+
+if (allowAny(allowLiterals, allowPrefixes, denyLiterals, denyPrefixes)) {
+  logAuditMessage(requestContext, action, true)
+  return AuthorizationResult.ALLOWED
+}
+
+logAuditMessage(requestContext, action, false)
+AuthorizationResult.DENIED
+  }
+
+  def matchingResources(principal: String, host: String, op: AclOperation, 
permission: AclPermissionType,
+resourceType: ResourceType, patternType: PatternType): 
List[mutable.HashSet[String]] = {

Review comment:
   I was thinking that we'd want to prevent constructing extremely large 
hash sets in order to prevent the long wait for resizing. What do you think?





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] d8tltanc commented on a change in pull request #9485: KAKFA-10619: Idempotent producer will get authorized once it has a WRITE access to at least one topic

2020-12-09 Thread GitBox


d8tltanc commented on a change in pull request #9485:
URL: https://github.com/apache/kafka/pull/9485#discussion_r539751258



##
File path: core/src/main/scala/kafka/security/authorizer/AclAuthorizer.scala
##
@@ -304,6 +308,105 @@ class AclAuthorizer extends Authorizer with Logging {
 if (zkClient != null) zkClient.close()
   }
 
+  // TODO: 1. Discuss how to log audit message
+  // TODO: 2. Discuss if we need a trie to optimize(mainly for the O(n^2) loop 
but I think
+  //  in most of the cases it would be O(1) because denyDominatePrefixAllow 
should be rare
+  override def authorizeByResourceType(requestContext: 
AuthorizableRequestContext,
+   op: AclOperation,
+   resourceType: ResourceType): 
AuthorizationResult = {
+SecurityUtils.authorizeByResourceTypeCheckArgs(op, resourceType)
+
+val principal = new KafkaPrincipal(
+  requestContext.principal().getPrincipalType,
+  requestContext.principal().getName).toString
+val host = requestContext.clientAddress().getHostAddress
+val action = new Action(op, new ResourcePattern(resourceType, "NONE", 
PatternType.UNKNOWN), 0, true, true)
+
+val denyLiterals = matchingResources(
+  principal, host, op, AclPermissionType.DENY, resourceType, 
PatternType.LITERAL)
+
+if (denyAll(denyLiterals)) {
+  logAuditMessage(requestContext, action, false)
+  return AuthorizationResult.DENIED
+}
+
+if (shouldAllowEveryoneIfNoAclIsFound) {
+  logAuditMessage(requestContext, action, true)
+  return AuthorizationResult.ALLOWED
+}
+
+val allowLiterals = matchingResources(
+  principal, host, op, AclPermissionType.ALLOW, resourceType, 
PatternType.LITERAL)
+val allowPrefixes = matchingResources(
+  principal, host, op, AclPermissionType.ALLOW, resourceType, 
PatternType.PREFIXED)
+val denyPrefixes = matchingResources(
+  principal, host, op, AclPermissionType.DENY, resourceType, 
PatternType.PREFIXED)

Review comment:
   Yes. commit 1dc143fc78a3b9927189751255346ef0b6cafd90
   if (noDeny) {
   ..if (hasAllow) {
   return Authorize.ALLOWED
   ..} else {
   return Authorize.DENIED // since no allow exists
   ..}
   }





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] d8tltanc commented on a change in pull request #9485: KAKFA-10619: Idempotent producer will get authorized once it has a WRITE access to at least one topic

2020-12-09 Thread GitBox


d8tltanc commented on a change in pull request #9485:
URL: https://github.com/apache/kafka/pull/9485#discussion_r539751258



##
File path: core/src/main/scala/kafka/security/authorizer/AclAuthorizer.scala
##
@@ -304,6 +308,105 @@ class AclAuthorizer extends Authorizer with Logging {
 if (zkClient != null) zkClient.close()
   }
 
+  // TODO: 1. Discuss how to log audit message
+  // TODO: 2. Discuss if we need a trie to optimize(mainly for the O(n^2) loop 
but I think
+  //  in most of the cases it would be O(1) because denyDominatePrefixAllow 
should be rare
+  override def authorizeByResourceType(requestContext: 
AuthorizableRequestContext,
+   op: AclOperation,
+   resourceType: ResourceType): 
AuthorizationResult = {
+SecurityUtils.authorizeByResourceTypeCheckArgs(op, resourceType)
+
+val principal = new KafkaPrincipal(
+  requestContext.principal().getPrincipalType,
+  requestContext.principal().getName).toString
+val host = requestContext.clientAddress().getHostAddress
+val action = new Action(op, new ResourcePattern(resourceType, "NONE", 
PatternType.UNKNOWN), 0, true, true)
+
+val denyLiterals = matchingResources(
+  principal, host, op, AclPermissionType.DENY, resourceType, 
PatternType.LITERAL)
+
+if (denyAll(denyLiterals)) {
+  logAuditMessage(requestContext, action, false)
+  return AuthorizationResult.DENIED
+}
+
+if (shouldAllowEveryoneIfNoAclIsFound) {
+  logAuditMessage(requestContext, action, true)
+  return AuthorizationResult.ALLOWED
+}
+
+val allowLiterals = matchingResources(
+  principal, host, op, AclPermissionType.ALLOW, resourceType, 
PatternType.LITERAL)
+val allowPrefixes = matchingResources(
+  principal, host, op, AclPermissionType.ALLOW, resourceType, 
PatternType.PREFIXED)
+val denyPrefixes = matchingResources(
+  principal, host, op, AclPermissionType.DENY, resourceType, 
PatternType.PREFIXED)

Review comment:
   Yes. commit 1dc143fc78a3b9927189751255346ef0b6cafd90
   if (noDeny) {
 if (hasAllow) {
   return Authorize.ALLOWED
 } else {
   return Authorize.DENIED // since no allow exists
 }
   }





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] d8tltanc commented on a change in pull request #9485: KAKFA-10619: Idempotent producer will get authorized once it has a WRITE access to at least one topic

2020-12-09 Thread GitBox


d8tltanc commented on a change in pull request #9485:
URL: https://github.com/apache/kafka/pull/9485#discussion_r539737287



##
File path: 
jmh-benchmarks/src/main/java/org/apache/kafka/jmh/acl/AclAuthorizerBenchmark.java
##
@@ -164,4 +206,28 @@ public void testAclsIterator() {
 public void testAuthorizer() {
 aclAuthorizer.authorize(context, actions);
 }
+
+@Benchmark
+public void testAuthorizeByResourceType() {
+aclAuthorizer.authorizeByResourceType(context, AclOperation.WRITE, 
ResourceType.TOPIC);
+}
+
+@Benchmark
+public void testUpdateCache() {
+AclAuthorizer aclAuthorizer = new AclAuthorizer();
+scala.collection.mutable.Set entries = new 
scala.collection.mutable.HashSet<>();
+for (int i = 0; i < resourceCount; i ++){
+scala.collection.immutable.Set immutable = new 
scala.collection.immutable.HashSet<>();
+for (int j = 0; j < aclCount; j++) {
+entries.add(new AclEntry(new AccessControlEntry(
+principal.toString(), "127.0.0" + j, AclOperation.WRITE, 
AclPermissionType.ALLOW)));
+immutable = entries.toSet();
+}
+aclAuthorizer.updateCache(

Review comment:
   Sure





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] d8tltanc commented on a change in pull request #9485: KAKFA-10619: Idempotent producer will get authorized once it has a WRITE access to at least one topic

2020-12-09 Thread GitBox


d8tltanc commented on a change in pull request #9485:
URL: https://github.com/apache/kafka/pull/9485#discussion_r539737110



##
File path: core/src/main/scala/kafka/security/authorizer/AclAuthorizer.scala
##
@@ -304,6 +308,105 @@ class AclAuthorizer extends Authorizer with Logging {
 if (zkClient != null) zkClient.close()
   }
 
+  // TODO: 1. Discuss how to log audit message
+  // TODO: 2. Discuss if we need a trie to optimize(mainly for the O(n^2) loop 
but I think
+  //  in most of the cases it would be O(1) because denyDominatePrefixAllow 
should be rare
+  override def authorizeByResourceType(requestContext: 
AuthorizableRequestContext,
+   op: AclOperation,
+   resourceType: ResourceType): 
AuthorizationResult = {
+SecurityUtils.authorizeByResourceTypeCheckArgs(op, resourceType)
+
+val principal = new KafkaPrincipal(
+  requestContext.principal().getPrincipalType,
+  requestContext.principal().getName).toString
+val host = requestContext.clientAddress().getHostAddress
+val action = new Action(op, new ResourcePattern(resourceType, "NONE", 
PatternType.UNKNOWN), 0, true, true)
+
+val denyLiterals = matchingResources(
+  principal, host, op, AclPermissionType.DENY, resourceType, 
PatternType.LITERAL)
+
+if (denyAll(denyLiterals)) {
+  logAuditMessage(requestContext, action, false)
+  return AuthorizationResult.DENIED
+}
+
+if (shouldAllowEveryoneIfNoAclIsFound) {
+  logAuditMessage(requestContext, action, true)
+  return AuthorizationResult.ALLOWED
+}
+
+val allowLiterals = matchingResources(
+  principal, host, op, AclPermissionType.ALLOW, resourceType, 
PatternType.LITERAL)
+val allowPrefixes = matchingResources(
+  principal, host, op, AclPermissionType.ALLOW, resourceType, 
PatternType.PREFIXED)
+val denyPrefixes = matchingResources(
+  principal, host, op, AclPermissionType.DENY, resourceType, 
PatternType.PREFIXED)
+
+if (allowAny(allowLiterals, allowPrefixes, denyLiterals, denyPrefixes)) {
+  logAuditMessage(requestContext, action, true)
+  return AuthorizationResult.ALLOWED
+}
+
+logAuditMessage(requestContext, action, false)
+AuthorizationResult.DENIED
+  }
+
+  def matchingResources(principal: String, host: String, op: AclOperation, 
permission: AclPermissionType,
+resourceType: ResourceType, patternType: PatternType): 
List[mutable.HashSet[String]] = {
+var matched = List[mutable.HashSet[String]]()
+for (p <- Set(principal, AclEntry.WildcardPrincipal.toString)) {

Review comment:
   commit 2fd4babe2c27ee0723fa1cd720ca35d2bbefe57b





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] d8tltanc commented on a change in pull request #9485: KAKFA-10619: Idempotent producer will get authorized once it has a WRITE access to at least one topic

2020-12-09 Thread GitBox


d8tltanc commented on a change in pull request #9485:
URL: https://github.com/apache/kafka/pull/9485#discussion_r539736828



##
File path: core/src/main/scala/kafka/security/authorizer/AclAuthorizer.scala
##
@@ -304,6 +308,105 @@ class AclAuthorizer extends Authorizer with Logging {
 if (zkClient != null) zkClient.close()
   }
 
+  // TODO: 1. Discuss how to log audit message
+  // TODO: 2. Discuss if we need a trie to optimize(mainly for the O(n^2) loop 
but I think
+  //  in most of the cases it would be O(1) because denyDominatePrefixAllow 
should be rare
+  override def authorizeByResourceType(requestContext: 
AuthorizableRequestContext,
+   op: AclOperation,
+   resourceType: ResourceType): 
AuthorizationResult = {
+SecurityUtils.authorizeByResourceTypeCheckArgs(op, resourceType)
+
+val principal = new KafkaPrincipal(
+  requestContext.principal().getPrincipalType,
+  requestContext.principal().getName).toString
+val host = requestContext.clientAddress().getHostAddress
+val action = new Action(op, new ResourcePattern(resourceType, "NONE", 
PatternType.UNKNOWN), 0, true, true)
+
+val denyLiterals = matchingResources(
+  principal, host, op, AclPermissionType.DENY, resourceType, 
PatternType.LITERAL)
+
+if (denyAll(denyLiterals)) {
+  logAuditMessage(requestContext, action, false)
+  return AuthorizationResult.DENIED
+}
+
+if (shouldAllowEveryoneIfNoAclIsFound) {
+  logAuditMessage(requestContext, action, true)
+  return AuthorizationResult.ALLOWED
+}
+
+val allowLiterals = matchingResources(
+  principal, host, op, AclPermissionType.ALLOW, resourceType, 
PatternType.LITERAL)
+val allowPrefixes = matchingResources(
+  principal, host, op, AclPermissionType.ALLOW, resourceType, 
PatternType.PREFIXED)
+val denyPrefixes = matchingResources(
+  principal, host, op, AclPermissionType.DENY, resourceType, 
PatternType.PREFIXED)
+
+if (allowAny(allowLiterals, allowPrefixes, denyLiterals, denyPrefixes)) {
+  logAuditMessage(requestContext, action, true)
+  return AuthorizationResult.ALLOWED
+}
+
+logAuditMessage(requestContext, action, false)
+AuthorizationResult.DENIED
+  }
+
+  def matchingResources(principal: String, host: String, op: AclOperation, 
permission: AclPermissionType,
+resourceType: ResourceType, patternType: PatternType): 
List[mutable.HashSet[String]] = {
+var matched = List[mutable.HashSet[String]]()
+for (p <- Set(principal, AclEntry.WildcardPrincipal.toString)) {
+  for (h <- Set(host, AclEntry.WildcardHost)) {
+for (o <- Set(op, AclOperation.ALL)) {
+  val ace = new AccessControlEntry(p, h, o, permission)
+  val resourceIndex = new ResourceIndex(ace, resourceType, patternType)
+  resourceCache.get(resourceIndex) match {
+case Some(resources) => matched = matched :+ resources
+case None =>
+  }
+}
+  }
+}
+matched
+  }
+
+  def denyAll(denyLiterals: List[mutable.HashSet[String]]): Boolean =

Review comment:
   Yes. commit 2fd4babe2c27ee0723fa1cd720ca35d2bbefe57b





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] d8tltanc commented on a change in pull request #9485: KAKFA-10619: Idempotent producer will get authorized once it has a WRITE access to at least one topic

2020-12-09 Thread GitBox


d8tltanc commented on a change in pull request #9485:
URL: https://github.com/apache/kafka/pull/9485#discussion_r539734983



##
File path: core/src/main/scala/kafka/security/authorizer/AclAuthorizer.scala
##
@@ -304,6 +308,105 @@ class AclAuthorizer extends Authorizer with Logging {
 if (zkClient != null) zkClient.close()
   }
 
+  // TODO: 1. Discuss how to log audit message
+  // TODO: 2. Discuss if we need a trie to optimize(mainly for the O(n^2) loop 
but I think
+  //  in most of the cases it would be O(1) because denyDominatePrefixAllow 
should be rare

Review comment:
   Yeah. Removed.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] d8tltanc commented on a change in pull request #9485: KAKFA-10619: Idempotent producer will get authorized once it has a WRITE access to at least one topic

2020-12-09 Thread GitBox


d8tltanc commented on a change in pull request #9485:
URL: https://github.com/apache/kafka/pull/9485#discussion_r539734315



##
File path: clients/src/main/java/org/apache/kafka/common/acl/ResourceIndex.java
##
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.common.acl;
+
+import org.apache.kafka.common.resource.PatternType;
+import org.apache.kafka.common.resource.ResourceType;
+
+import java.util.Objects;
+
+public class ResourceIndex {

Review comment:
   I used index as it's used as the index of the hashmap. What about 
something like `ResourceNameFilter`?





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




  1   2   >