[GitHub] [kafka] rajinisivaram commented on a change in pull request #9485: KAKFA-10619: Idempotent producer will get authorized once it has a WRITE access to at least one topic

2020-12-17 Thread GitBox


rajinisivaram commented on a change in pull request #9485:
URL: https://github.com/apache/kafka/pull/9485#discussion_r545261128



##
File path: core/src/main/scala/kafka/security/authorizer/AclAuthorizer.scala
##
@@ -304,6 +309,137 @@ class AclAuthorizer extends Authorizer with Logging {
 if (zkClient != null) zkClient.close()
   }
 
+  override def authorizeByResourceType(requestContext: 
AuthorizableRequestContext,
+   op: AclOperation,
+   resourceType: ResourceType): 
AuthorizationResult = {
+SecurityUtils.authorizeByResourceTypeCheckArgs(op, resourceType)
+
+val principal = new KafkaPrincipal(
+  requestContext.principal().getPrincipalType,
+  requestContext.principal().getName)
+
+if (isSuperUser(principal))
+  return AuthorizationResult.ALLOWED
+
+val principalStr = principal.toString
+
+val host = requestContext.clientAddress().getHostAddress
+val action = new Action(op, new ResourcePattern(resourceType, "NONE", 
PatternType.UNKNOWN), 0, true, true)
+
+val denyLiterals = matchingResources(
+  principalStr, host, op, AclPermissionType.DENY, resourceType, 
PatternType.LITERAL)
+
+if (denyAll(denyLiterals)) {
+  logAuditMessage(requestContext, action, authorized = false)
+  return AuthorizationResult.DENIED
+}
+
+if (shouldAllowEveryoneIfNoAclIsFound) {
+  logAuditMessage(requestContext, action, authorized = true)
+  return AuthorizationResult.ALLOWED
+}
+
+val denyPrefixes = matchingResources(
+  principalStr, host, op, AclPermissionType.DENY, resourceType, 
PatternType.PREFIXED)
+
+if (denyLiterals.isEmpty && denyPrefixes.isEmpty) {
+  if (hasMatchingResources(principalStr, host, op, 
AclPermissionType.ALLOW, resourceType, PatternType.PREFIXED)
+  || hasMatchingResources(principalStr, host, op, 
AclPermissionType.ALLOW, resourceType, PatternType.LITERAL)) {
+logAuditMessage(requestContext, action, authorized = true)
+return AuthorizationResult.ALLOWED
+  } else {
+logAuditMessage(requestContext, action, authorized = false)
+return AuthorizationResult.DENIED
+  }
+}
+
+val allowLiterals = matchingResources(
+  principalStr, host, op, AclPermissionType.ALLOW, resourceType, 
PatternType.LITERAL)
+val allowPrefixes = matchingResources(
+  principalStr, host, op, AclPermissionType.ALLOW, resourceType, 
PatternType.PREFIXED)
+
+if (allowAny(allowLiterals, allowPrefixes, denyLiterals, denyPrefixes)) {
+  logAuditMessage(requestContext, action, authorized = true)
+  return AuthorizationResult.ALLOWED
+}
+
+logAuditMessage(requestContext, action, authorized = false)
+AuthorizationResult.DENIED
+  }
+
+  def matchingResources(principal: String, host: String, op: AclOperation, 
permission: AclPermissionType,
+resourceType: ResourceType, patternType: PatternType): 
List[immutable.HashSet[String]] = {
+var matched = List[immutable.HashSet[String]]()
+for (p <- Set(principal, AclEntry.WildcardPrincipalString)) {
+  for (h <- Set(host, AclEntry.WildcardHost)) {
+for (o <- Set(op, AclOperation.ALL)) {
+  val resourceIndex = new ResourceTypeKey(

Review comment:
   `resourceIndex` => `resourceTypeKey`, Also we can omit new for 
ResourceTypeKey since it is a case class.

##
File path: core/src/main/scala/kafka/security/authorizer/AclAuthorizer.scala
##
@@ -304,6 +309,137 @@ class AclAuthorizer extends Authorizer with Logging {
 if (zkClient != null) zkClient.close()
   }
 
+  override def authorizeByResourceType(requestContext: 
AuthorizableRequestContext,
+   op: AclOperation,
+   resourceType: ResourceType): 
AuthorizationResult = {
+SecurityUtils.authorizeByResourceTypeCheckArgs(op, resourceType)
+
+val principal = new KafkaPrincipal(
+  requestContext.principal().getPrincipalType,
+  requestContext.principal().getName)
+
+if (isSuperUser(principal))
+  return AuthorizationResult.ALLOWED
+
+val principalStr = principal.toString
+
+val host = requestContext.clientAddress().getHostAddress
+val action = new Action(op, new ResourcePattern(resourceType, "NONE", 
PatternType.UNKNOWN), 0, true, true)
+
+val denyLiterals = matchingResources(
+  principalStr, host, op, AclPermissionType.DENY, resourceType, 
PatternType.LITERAL)
+
+if (denyAll(denyLiterals)) {
+  logAuditMessage(requestContext, action, authorized = false)
+  return AuthorizationResult.DENIED
+}
+
+if (shouldAllowEveryoneIfNoAclIsFound) {
+  logAuditMessage(requestContext, action, authorized = true)
+  return AuthorizationResult.ALLOWED
+}
+
+val denyPrefixes = matchingResources(
+  principalStr, host, op, AclPermissionType.DENY, resourceType, 

[GitHub] [kafka] rajinisivaram commented on a change in pull request #9485: KAKFA-10619: Idempotent producer will get authorized once it has a WRITE access to at least one topic

2020-12-16 Thread GitBox


rajinisivaram commented on a change in pull request #9485:
URL: https://github.com/apache/kafka/pull/9485#discussion_r544223630



##
File path: 
clients/src/main/java/org/apache/kafka/server/authorizer/Authorizer.java
##
@@ -139,4 +152,134 @@
  * @return Iterator for ACL bindings, which may be populated lazily.
  */
 Iterable acls(AclBindingFilter filter);
+
+/**
+ * Check if the caller is authorized to perform the given ACL operation on 
at least one
+ * resource of the given type.
+ *
+ * It is important to override this interface default in implementations 
because
+ * 1. The interface default iterates all AclBindings multiple times, 
without any indexing,
+ *which is a CPU intense work.
+ * 2. The interface default rebuild several sets of strings, which is a 
memory intense work.
+ * 3. The interface default cannot perform the audit logging properly
+ *
+ * @param requestContext Request context including request resourceType, 
security protocol, and listener name
+ * @param op The ACL operation to check
+ * @param resourceType   The resource type to check
+ * @return   Return {@link AuthorizationResult#ALLOWED} if the 
caller is authorized to perform the
+ *   given ACL operation on at least one resource of 
the given type.
+ *   Return {@link AuthorizationResult#DENIED} 
otherwise.
+ */
+default AuthorizationResult 
authorizeByResourceType(AuthorizableRequestContext requestContext, AclOperation 
op, ResourceType resourceType) {
+SecurityUtils.authorizeByResourceTypeCheckArgs(op, resourceType);
+
+if (authorize(requestContext, Collections.singletonList(new Action(
+AclOperation.READ,
+new ResourcePattern(resourceType, "hardcode", 
PatternType.LITERAL),
+0, false, false)))

Review comment:
   Use `logIfAllowed=true` since we are granting access in that case.

##
File path: 
core/src/test/scala/unit/kafka/security/authorizer/AuthorizerWrapperTest.scala
##
@@ -0,0 +1,211 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package kafka.security.authorizer
+
+import java.net.InetAddress
+import java.util.UUID
+
+import kafka.security.auth.SimpleAclAuthorizer
+import kafka.server.KafkaConfig
+import kafka.utils.TestUtils
+import kafka.zk.ZooKeeperTestHarness
+import kafka.zookeeper.ZooKeeperClient
+import org.apache.kafka.common.acl.AclOperation._
+import org.apache.kafka.common.acl._
+import org.apache.kafka.common.network.{ClientInformation, ListenerName}
+import org.apache.kafka.common.protocol.ApiKeys
+import org.apache.kafka.common.requests.{RequestContext, RequestHeader}
+import org.apache.kafka.common.resource.PatternType.LITERAL
+import org.apache.kafka.common.resource.ResourceType._
+import org.apache.kafka.common.resource.{ResourcePattern, ResourceType}
+import org.apache.kafka.common.security.auth.{KafkaPrincipal, SecurityProtocol}
+import org.apache.kafka.common.utils.Time
+import org.apache.kafka.server.authorizer._
+import org.junit.Assert._
+import org.junit.{After, Before, Test}
+
+import scala.annotation.nowarn
+import scala.collection.mutable.ArrayBuffer
+import scala.jdk.CollectionConverters._
+
+class AuthorizerWrapperTest extends ZooKeeperTestHarness {
+  @nowarn("cat=deprecation")
+  private val wrappedSimpleAuthorizer = new AuthorizerWrapper(new 
SimpleAclAuthorizer)
+  @nowarn("cat=deprecation")
+  private val wrappedSimpleAuthorizerAllowEveryone = new AuthorizerWrapper(new 
SimpleAclAuthorizer)
+  private var resource: ResourcePattern = _
+  private val superUsers = "User:superuser1; User:superuser2"
+  private val username = "alice"
+  private val principal = new KafkaPrincipal(KafkaPrincipal.USER_TYPE, 
username)
+  private val requestContext = newRequestContext(principal, 
InetAddress.getByName("192.168.0.1"))
+  private var config: KafkaConfig = _
+  private var zooKeeperClient: ZooKeeperClient = _
+
+  private val aclAdded: ArrayBuffer[(Authorizer, Set[AccessControlEntry], 
ResourcePattern)] = ArrayBuffer()
+  private val authorizerTestFactory = new AuthorizerTestFactory(
+  

[GitHub] [kafka] rajinisivaram commented on a change in pull request #9485: KAKFA-10619: Idempotent producer will get authorized once it has a WRITE access to at least one topic

2020-12-14 Thread GitBox


rajinisivaram commented on a change in pull request #9485:
URL: https://github.com/apache/kafka/pull/9485#discussion_r542830098



##
File path: 
clients/src/main/java/org/apache/kafka/server/authorizer/Authorizer.java
##
@@ -139,4 +151,126 @@
  * @return Iterator for ACL bindings, which may be populated lazily.
  */
 Iterable acls(AclBindingFilter filter);
+
+/**
+ * Check if the caller is authorized to perform the given ACL operation on 
at least one
+ * resource of the given type.
+ *
+ * 1. Filter out all the resource pattern corresponding to the 
requestContext, AclOperation,
+ *and ResourceType
+ * 2. If wildcard deny exists, return deny directly
+ * 3. For any literal allowed resource, if there's no dominant literal 
denied resource, and
+ *no dominant prefixed denied resource, return allow
+ * 4. For any prefixed allowed resource, if there's no dominant denied 
resource, return allow
+ * 5. For any other cases, return deny

Review comment:
   Since this is the javadoc of a public API, we should move the details on 
how the default implementation works outside of the javadoc. We can move this 
list of comments inside the method. 

##
File path: 
clients/src/main/java/org/apache/kafka/server/authorizer/Authorizer.java
##
@@ -139,4 +151,126 @@
  * @return Iterator for ACL bindings, which may be populated lazily.
  */
 Iterable acls(AclBindingFilter filter);
+
+/**
+ * Check if the caller is authorized to perform the given ACL operation on 
at least one
+ * resource of the given type.
+ *
+ * 1. Filter out all the resource pattern corresponding to the 
requestContext, AclOperation,
+ *and ResourceType
+ * 2. If wildcard deny exists, return deny directly
+ * 3. For any literal allowed resource, if there's no dominant literal 
denied resource, and
+ *no dominant prefixed denied resource, return allow
+ * 4. For any prefixed allowed resource, if there's no dominant denied 
resource, return allow
+ * 5. For any other cases, return deny
+ *
+ * It is important to override this interface default in implementations 
because
+ * 1. The interface default iterates all AclBindings multiple times, 
without any indexing,
+ *which is a CPU intense work.
+ * 2. The interface default rebuild several sets of strings, which is a 
memory intense work.
+ *
+ * @param requestContext Request context including request resourceType, 
security protocol, and listener name
+ * @param op The ACL operation to check
+ * @param resourceType   The resource type to check
+ * @return   Return {@link AuthorizationResult#ALLOWED} if the 
caller is authorized to perform the
+ *   given ACL operation on at least one resource of 
the given type.
+ *   Return {@link AuthorizationResult#DENIED} 
otherwise.
+ */
+default AuthorizationResult 
authorizeByResourceType(AuthorizableRequestContext requestContext, AclOperation 
op, ResourceType resourceType) {
+SecurityUtils.authorizeByResourceTypeCheckArgs(op, resourceType);

Review comment:
   We don't currently have anything in the default implementation to 
support super.users right? Unlike `allow.everyone.if.no.acl.found` which is not 
particularly suitable for production use, `super.users` is a commonly used 
config that is likely to be in use in a lot of deployments. The simplest fix 
may be to `authorize()` with a hard-coded name and return ALLOWED if 
`authorize()` returns ALLOWED before any of the logic below is executed.

##
File path: 
core/src/test/scala/unit/kafka/security/authorizer/AuthorizerInterfaceDefaultTest.scala
##
@@ -0,0 +1,164 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package kafka.security.authorizer
+
+import java.net.InetAddress
+
+import kafka.server.KafkaConfig
+import kafka.utils.TestUtils
+import kafka.zk.ZooKeeperTestHarness
+import kafka.zookeeper.ZooKeeperClient
+import org.apache.kafka.common.acl._
+import org.apache.kafka.common.network.{ClientInformation, ListenerName}
+import 

[GitHub] [kafka] rajinisivaram commented on a change in pull request #9485: KAKFA-10619: Idempotent producer will get authorized once it has a WRITE access to at least one topic

2020-12-09 Thread GitBox


rajinisivaram commented on a change in pull request #9485:
URL: https://github.com/apache/kafka/pull/9485#discussion_r539594382



##
File path: clients/src/main/java/org/apache/kafka/common/acl/ResourceIndex.java
##
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.common.acl;

Review comment:
   This package is part of the public API, but the class looks like it 
should be internal?

##
File path: core/src/main/scala/kafka/security/authorizer/AclAuthorizer.scala
##
@@ -304,6 +308,105 @@ class AclAuthorizer extends Authorizer with Logging {
 if (zkClient != null) zkClient.close()
   }
 
+  // TODO: 1. Discuss how to log audit message
+  // TODO: 2. Discuss if we need a trie to optimize(mainly for the O(n^2) loop 
but I think
+  //  in most of the cases it would be O(1) because denyDominatePrefixAllow 
should be rare
+  override def authorizeByResourceType(requestContext: 
AuthorizableRequestContext,
+   op: AclOperation,
+   resourceType: ResourceType): 
AuthorizationResult = {
+SecurityUtils.authorizeByResourceTypeCheckArgs(op, resourceType)
+
+val principal = new KafkaPrincipal(
+  requestContext.principal().getPrincipalType,
+  requestContext.principal().getName).toString
+val host = requestContext.clientAddress().getHostAddress
+val action = new Action(op, new ResourcePattern(resourceType, "NONE", 
PatternType.UNKNOWN), 0, true, true)
+
+val denyLiterals = matchingResources(
+  principal, host, op, AclPermissionType.DENY, resourceType, 
PatternType.LITERAL)
+
+if (denyAll(denyLiterals)) {
+  logAuditMessage(requestContext, action, false)
+  return AuthorizationResult.DENIED
+}
+
+if (shouldAllowEveryoneIfNoAclIsFound) {
+  logAuditMessage(requestContext, action, true)
+  return AuthorizationResult.ALLOWED
+}
+
+val allowLiterals = matchingResources(
+  principal, host, op, AclPermissionType.ALLOW, resourceType, 
PatternType.LITERAL)
+val allowPrefixes = matchingResources(
+  principal, host, op, AclPermissionType.ALLOW, resourceType, 
PatternType.PREFIXED)
+val denyPrefixes = matchingResources(
+  principal, host, op, AclPermissionType.DENY, resourceType, 
PatternType.PREFIXED)
+
+if (allowAny(allowLiterals, allowPrefixes, denyLiterals, denyPrefixes)) {
+  logAuditMessage(requestContext, action, true)
+  return AuthorizationResult.ALLOWED
+}
+
+logAuditMessage(requestContext, action, false)
+AuthorizationResult.DENIED
+  }
+
+  def matchingResources(principal: String, host: String, op: AclOperation, 
permission: AclPermissionType,
+resourceType: ResourceType, patternType: PatternType): 
List[mutable.HashSet[String]] = {
+var matched = List[mutable.HashSet[String]]()
+for (p <- Set(principal, AclEntry.WildcardPrincipal.toString)) {

Review comment:
   `AclEntry.WildcardPrincipalString`

##
File path: core/src/main/scala/kafka/security/authorizer/AuthorizerWrapper.scala
##
@@ -175,4 +181,32 @@ class AuthorizerWrapper(private[kafka] val baseAuthorizer: 
kafka.security.auth.A
   override def close(): Unit = {
 baseAuthorizer.close()
   }
+
+  override def authorizeByResourceType(requestContext: 
AuthorizableRequestContext,
+   op: AclOperation,
+   resourceType: ResourceType): 
AuthorizationResult = {
+SecurityUtils.authorizeByResourceTypeCheckArgs(op, resourceType)
+
+if (denyAllResource(requestContext, op, resourceType)) {
+  AuthorizationResult.DENIED
+} else if (shouldAllowEveryoneIfNoAclIsFound) {
+  AuthorizationResult.ALLOWED
+} else {
+  super.authorizeByResourceType(requestContext, op, resourceType)
+}
+  }
+
+  private def denyAllResource(requestContext: AuthorizableRequestContext,
+  op: AclOperation,
+  resourceType: ResourceType): Boolean = {
+val resourceTypeFilter = new ResourcePatternFilter(
+  resourceType, null, PatternType.ANY)
+val principal = new 

[GitHub] [kafka] rajinisivaram commented on a change in pull request #9485: KAKFA-10619: Idempotent producer will get authorized once it has a WRITE access to at least one topic

2020-12-01 Thread GitBox


rajinisivaram commented on a change in pull request #9485:
URL: https://github.com/apache/kafka/pull/9485#discussion_r533510477



##
File path: core/src/main/scala/kafka/security/authorizer/AclAuthorizer.scala
##
@@ -304,6 +308,122 @@ class AclAuthorizer extends Authorizer with Logging {
 if (zkClient != null) zkClient.close()
   }
 
+  // TODO: 1. Discuss how to log audit message
+  // TODO: 2. Discuss if we need a trie to optimize(mainly for the O(n^2) loop 
but I think
+  //  in most of the cases it would be O(1) because denyDominatePrefixAllow 
should be rare
+  override def authorizeByResourceType(requestContext: 
AuthorizableRequestContext,
+   op: AclOperation,
+   resourceType: ResourceType): 
AuthorizationResult = {
+if (resourceType eq ResourceType.ANY)
+  throw new IllegalArgumentException("Must specify a non-filter resource 
type for authorizeByResourceType")
+
+if (resourceType eq ResourceType.UNKNOWN)
+  throw new IllegalArgumentException("Unknown resource type")
+
+if (op eq AclOperation.ANY)
+  throw new IllegalArgumentException("Must specify a non-filter operation 
type for authorizeByResourceType")
+
+if (op eq AclOperation.UNKNOWN)
+  throw new IllegalArgumentException("Unknown operation type")

Review comment:
   We should probably move this common code to SecurityUtils and use it 
both here and in the default implementation.

##
File path: core/src/main/scala/kafka/security/authorizer/AclAuthorizer.scala
##
@@ -304,6 +308,122 @@ class AclAuthorizer extends Authorizer with Logging {
 if (zkClient != null) zkClient.close()
   }
 
+  // TODO: 1. Discuss how to log audit message
+  // TODO: 2. Discuss if we need a trie to optimize(mainly for the O(n^2) loop 
but I think
+  //  in most of the cases it would be O(1) because denyDominatePrefixAllow 
should be rare
+  override def authorizeByResourceType(requestContext: 
AuthorizableRequestContext,
+   op: AclOperation,
+   resourceType: ResourceType): 
AuthorizationResult = {
+if (resourceType eq ResourceType.ANY)
+  throw new IllegalArgumentException("Must specify a non-filter resource 
type for authorizeByResourceType")
+
+if (resourceType eq ResourceType.UNKNOWN)
+  throw new IllegalArgumentException("Unknown resource type")
+
+if (op eq AclOperation.ANY)
+  throw new IllegalArgumentException("Must specify a non-filter operation 
type for authorizeByResourceType")
+
+if (op eq AclOperation.UNKNOWN)
+  throw new IllegalArgumentException("Unknown operation type")
+
+val principal = new KafkaPrincipal(
+  requestContext.principal().getPrincipalType,
+  requestContext.principal().getName).toString
+
+val denyPatterns = matchingPatterns(
+  principal,
+  requestContext.clientAddress().getHostAddress,
+  op,
+  resourceType,
+  AclPermissionType.DENY
+)
+
+if (denyAll(denyPatterns)) {
+  logAuditMessage(requestContext, new Action(op, null,0, true, true), 
false, false)
+  return AuthorizationResult.DENIED
+}
+
+if (shouldAllowEveryoneIfNoAclIsFound) {
+  logAuditMessage(requestContext, new Action(op, null, 0, true, true), 
true, false)
+  return AuthorizationResult.ALLOWED
+}
+
+val allowPatterns = matchingPatterns(
+  principal,
+  requestContext.clientAddress().getHostAddress,
+  op,
+  resourceType,
+  AclPermissionType.ALLOW
+)
+
+if (allowAny(allowPatterns, denyPatterns)) {
+  logAuditMessage(requestContext, new Action(op,null, 0, true, true), 
true, false)
+  return AuthorizationResult.ALLOWED
+}
+
+logAuditMessage(requestContext, new Action(op, null, 0, true, true), 
false, false)
+AuthorizationResult.DENIED
+  }
+
+  def matchingPatterns(principal: String, host: String, op: AclOperation,
+   resourceType: ResourceType,
+   permission: AclPermissionType): Set[ResourcePattern] = {
+var resources = Set[ResourcePattern]()
+for (p <- Set(principal, AclEntry.WildcardPrincipal.toString)) {
+  for (h <- Set(host, AclEntry.WildcardHost)) {
+for (o <- Set(op, AclOperation.ALL)) {
+  val ace = new AccessControlEntry(p, h, o, permission)
+  resourceCache.get(ace) match {
+case Some(r) => resources ++= r.filter(r => r.resourceType() == 
resourceType)
+case None =>
+  }
+}
+  }
+}
+resources
+  }
+
+  private def denyAll(denyResources: Set[ResourcePattern]): Boolean =
+denyResources.exists(rp => denyAll(rp))
+
+  private def denyAll(rp: ResourcePattern): Boolean =
+rp.patternType() == PatternType.LITERAL && rp.name() == 
ResourcePattern.WILDCARD_RESOURCE
+
+  private def allowAny(allowPatterns: Set[ResourcePattern], denyPatterns: 

[GitHub] [kafka] rajinisivaram commented on a change in pull request #9485: KAKFA-10619: Idempotent producer will get authorized once it has a WRITE access to at least one topic

2020-11-18 Thread GitBox


rajinisivaram commented on a change in pull request #9485:
URL: https://github.com/apache/kafka/pull/9485#discussion_r526479473



##
File path: 
clients/src/main/java/org/apache/kafka/server/authorizer/Authorizer.java
##
@@ -139,4 +150,129 @@
  * @return Iterator for ACL bindings, which may be populated lazily.
  */
 Iterable acls(AclBindingFilter filter);
+
+/**
+ * Check if the caller is authorized to perform the given ACL operation on 
at least one
+ * resource of the given type.

Review comment:
   We should document what this default implementation does and why a 
custom implementation may want to override this default.

##
File path: 
clients/src/main/java/org/apache/kafka/server/authorizer/Authorizer.java
##
@@ -139,4 +150,129 @@
  * @return Iterator for ACL bindings, which may be populated lazily.
  */
 Iterable acls(AclBindingFilter filter);
+
+/**
+ * Check if the caller is authorized to perform the given ACL operation on 
at least one
+ * resource of the given type.

Review comment:
   We should document what this default implementation does and why a 
custom implementation may want to override this default.

##
File path: core/src/main/scala/kafka/security/authorizer/AclAuthorizer.scala
##
@@ -307,6 +312,111 @@ class AclAuthorizer extends Authorizer with Logging {
 if (zkClient != null) zkClient.close()
   }
 
+  // TODO: 1. Discuss how to log audit message

Review comment:
   We should have exactly one call to `logAuditMessage` that says whether 
access was allowed or denied.

##
File path: core/src/main/scala/kafka/security/authorizer/AclAuthorizer.scala
##
@@ -307,6 +312,111 @@ class AclAuthorizer extends Authorizer with Logging {
 if (zkClient != null) zkClient.close()
   }
 
+  // TODO: 1. Discuss how to log audit message
+  // TODO: 2. Discuss if we need a trie to optimize(mainly for the O(n^2) loop 
but I think
+  //  in most of the cases it would be O(1) because denyDominatePrefixAllow 
should be rare
+  override def authorizeByResourceType(requestContext: 
AuthorizableRequestContext,
+   op: AclOperation,
+   resourceType: ResourceType): 
AuthorizationResult = {
+if (resourceType eq ResourceType.ANY)
+  throw new IllegalArgumentException("Must specify a non-filter resource 
type for authorizeByResourceType")
+
+if (resourceType eq ResourceType.UNKNOWN)
+  throw new IllegalArgumentException("Unknown resource type")
+
+if (op eq AclOperation.ANY)
+  throw new IllegalArgumentException("Must specify a non-filter operation 
type for authorizeByResourceType")
+
+if (op eq AclOperation.UNKNOWN)
+  throw new IllegalArgumentException("Unknown operation type")
+
+val allowPatterns = matchingPatterns(
+  requestContext.principal().toString,
+  requestContext.clientAddress().getHostAddress,
+  op,
+  resourceType,
+  AclPermissionType.ALLOW
+)
+
+val denyPatterns = matchingPatterns(
+  requestContext.principal().toString,
+  requestContext.clientAddress().getHostAddress,
+  op,
+  resourceType,
+  AclPermissionType.DENY

Review comment:
   We should optimize for the case where there are no DENY acls. There is 
no point in finding all matching ALLOW entries in that case, we would just need 
to check for one ALLOW.

##
File path: 
core/src/test/scala/unit/kafka/security/authorizer/AclAuthorizerTest.scala
##
@@ -988,6 +1000,369 @@ class AclAuthorizerTest extends ZooKeeperTestHarness {
 }
   }
 
+  @Test
+  def testAuthorizeAnyDurability(): Unit = {

Review comment:
   As before, references to Durability in authorizer tests are confusing.

##
File path: 
core/src/test/scala/unit/kafka/security/authorizer/AuthorizerInterfaceDefaultTest.java
##
@@ -0,0 +1,4 @@
+package unit.kafka.security.authorizer;
+
+public class AuthorizerInterfaceDefaultTest {

Review comment:
   Are we going to add tests here?

##
File path: 
core/src/test/scala/unit/kafka/security/authorizer/MockAuthorizer.scala
##
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing