d8tltanc commented on a change in pull request #9485:
URL: https://github.com/apache/kafka/pull/9485#discussion_r545577832



##########
File path: 
clients/src/main/java/org/apache/kafka/server/authorizer/Authorizer.java
##########
@@ -139,4 +152,133 @@
      * @return Iterator for ACL bindings, which may be populated lazily.
      */
     Iterable<AclBinding> acls(AclBindingFilter filter);
+
+    /**
+     * Check if the caller is authorized to perform the given ACL operation on 
at least one
+     * resource of the given type.
+     *
+     * It is important to override this interface default in implementations 
because
+     * 1. The interface default iterates all AclBindings multiple times, 
without any indexing,
+     *    which is a CPU intense work.
+     * 2. The interface default rebuild several sets of strings, which is a 
memory intense work.
+     * 3. The interface default cannot perform the audit logging properly
+     *
+     * @param requestContext Request context including request resourceType, 
security protocol, and listener name
+     * @param op             The ACL operation to check
+     * @param resourceType   The resource type to check
+     * @return               Return {@link AuthorizationResult#ALLOWED} if the 
caller is authorized to perform the
+     *                       given ACL operation on at least one resource of 
the given type.
+     *                       Return {@link AuthorizationResult#DENIED} 
otherwise.
+     */
+    default AuthorizationResult 
authorizeByResourceType(AuthorizableRequestContext requestContext, AclOperation 
op, ResourceType resourceType) {
+        SecurityUtils.authorizeByResourceTypeCheckArgs(op, resourceType);
+
+        if (authorize(requestContext, Collections.singletonList(new Action(

Review comment:
       commit b6a766b

##########
File path: 
clients/src/main/java/org/apache/kafka/server/authorizer/Authorizer.java
##########
@@ -139,4 +152,133 @@
      * @return Iterator for ACL bindings, which may be populated lazily.
      */
     Iterable<AclBinding> acls(AclBindingFilter filter);
+
+    /**
+     * Check if the caller is authorized to perform the given ACL operation on 
at least one
+     * resource of the given type.
+     *
+     * It is important to override this interface default in implementations 
because
+     * 1. The interface default iterates all AclBindings multiple times, 
without any indexing,
+     *    which is a CPU intense work.
+     * 2. The interface default rebuild several sets of strings, which is a 
memory intense work.
+     * 3. The interface default cannot perform the audit logging properly
+     *
+     * @param requestContext Request context including request resourceType, 
security protocol, and listener name
+     * @param op             The ACL operation to check
+     * @param resourceType   The resource type to check
+     * @return               Return {@link AuthorizationResult#ALLOWED} if the 
caller is authorized to perform the
+     *                       given ACL operation on at least one resource of 
the given type.
+     *                       Return {@link AuthorizationResult#DENIED} 
otherwise.
+     */
+    default AuthorizationResult 
authorizeByResourceType(AuthorizableRequestContext requestContext, AclOperation 
op, ResourceType resourceType) {
+        SecurityUtils.authorizeByResourceTypeCheckArgs(op, resourceType);
+
+        if (authorize(requestContext, Collections.singletonList(new Action(
+                op, new ResourcePattern(resourceType, "hardcode", 
PatternType.LITERAL),
+                0, true, false)))
+                .get(0) == AuthorizationResult.ALLOWED) {
+            return AuthorizationResult.ALLOWED;
+        }
+
+        // Filter out all the resource pattern corresponding to the 
RequestContext,
+        // AclOperation, and ResourceType
+        ResourcePatternFilter resourceTypeFilter = new ResourcePatternFilter(
+            resourceType, null, PatternType.ANY);
+        AclBindingFilter aclFilter = new AclBindingFilter(
+            resourceTypeFilter, AccessControlEntryFilter.ANY);
+
+        EnumMap<PatternType, Set<String>> denyPatterns =
+            new EnumMap<PatternType, Set<String>>(PatternType.class) {{
+                put(PatternType.LITERAL, new HashSet<>());
+                put(PatternType.PREFIXED, new HashSet<>());
+            }};
+        EnumMap<PatternType, Set<String>> allowPatterns =
+            new EnumMap<PatternType, Set<String>>(PatternType.class) {{
+                put(PatternType.LITERAL, new HashSet<>());
+                put(PatternType.PREFIXED, new HashSet<>());
+            }};
+
+        boolean hasWildCardAllow = false;
+
+        KafkaPrincipal principal = new KafkaPrincipal(
+            requestContext.principal().getPrincipalType(),
+            requestContext.principal().getName());
+        String hostAddr = requestContext.clientAddress().getHostAddress();
+
+        for (AclBinding binding : acls(aclFilter)) {
+            if (!binding.entry().host().equals(hostAddr) && 
!binding.entry().host().equals("*"))
+                continue;
+
+            if 
(!SecurityUtils.parseKafkaPrincipal(binding.entry().principal()).equals(principal)
+                    && !binding.entry().principal().equals("User:*"))
+                continue;
+
+            if (binding.entry().operation() != op
+                    && binding.entry().operation() != AclOperation.ALL)
+                continue;
+
+            if (binding.entry().permissionType() == AclPermissionType.DENY) {
+                switch (binding.pattern().patternType()) {
+                    case LITERAL:
+                        // If wildcard deny exists, return deny directly
+                        if 
(binding.pattern().name().equals(ResourcePattern.WILDCARD_RESOURCE))
+                            return AuthorizationResult.DENIED;
+                        
denyPatterns.get(PatternType.LITERAL).add(binding.pattern().name());
+                        break;
+                    case PREFIXED:
+                        
denyPatterns.get(PatternType.PREFIXED).add(binding.pattern().name());
+                        break;
+                    default:
+                }
+                continue;
+            }
+
+            if (binding.entry().permissionType() != AclPermissionType.ALLOW)
+                continue;
+
+            switch (binding.pattern().patternType()) {
+                case LITERAL:
+                    if 
(binding.pattern().name().equals(ResourcePattern.WILDCARD_RESOURCE)) {
+                        hasWildCardAllow = true;
+                        continue;
+                    }
+                    
allowPatterns.get(PatternType.LITERAL).add(binding.pattern().name());
+                    break;
+                case PREFIXED:
+                    
allowPatterns.get(PatternType.PREFIXED).add(binding.pattern().name());
+                    break;
+                default:
+            }
+        }
+
+        if (hasWildCardAllow) {
+            return AuthorizationResult.ALLOWED;
+        }
+
+        // For any literal allowed, if there's no dominant literal
+        // and prefix denied, return allow.
+        // For any prefix allowed, if there's no dominant prefix
+        // denied, return allow.

Review comment:
       commit b6a766b




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to