d8tltanc commented on a change in pull request #9485:
URL: https://github.com/apache/kafka/pull/9485#discussion_r533915036



##########
File path: 
clients/src/main/java/org/apache/kafka/server/authorizer/Authorizer.java
##########
@@ -139,4 +152,125 @@
      * @return Iterator for ACL bindings, which may be populated lazily.
      */
     Iterable<AclBinding> acls(AclBindingFilter filter);
+
+    /**
+     * Check if the caller is authorized to perform the given ACL operation on 
at least one
+     * resource of the given type.
+     *
+     * @param requestContext Request context including request resourceType, 
security protocol, and listener name
+     * @param op             The ACL operation to check
+     * @param resourceType   The resource type to check
+     * @return               Return {@link AuthorizationResult#ALLOWED} if the 
caller is authorized to perform the
+     *                       given ACL operation on at least one resource of 
the given type.
+     *                       Return {@link AuthorizationResult#DENIED} 
otherwise.
+     */
+    default AuthorizationResult 
authorizeByResourceType(AuthorizableRequestContext requestContext, AclOperation 
op, ResourceType resourceType) {
+        if (resourceType == ResourceType.ANY) {
+            throw new IllegalArgumentException(
+                "Must specify a non-filter resource type for 
authorizeByResourceType");
+        }
+
+        if (resourceType == ResourceType.UNKNOWN) {
+            throw new IllegalArgumentException(
+                "Unknown resource type");
+        }
+
+        if (op == AclOperation.ANY) {
+            throw new IllegalArgumentException(
+                "Must specify a non-filter operation type for 
authorizeByResourceType");
+        }
+
+        if (op == AclOperation.UNKNOWN) {
+            throw new IllegalArgumentException(
+                "Unknown operation type");
+        }
+
+        ResourcePatternFilter resourceTypeFilter = new ResourcePatternFilter(
+            resourceType, null, PatternType.ANY);
+        AclBindingFilter aclFilter = new AclBindingFilter(
+            resourceTypeFilter, AccessControlEntryFilter.ANY);
+
+        final int typeLiteral = 0;
+        final int typePrefix = 1;
+
+        List<Set<String>> deny = new ArrayList<>(
+            Arrays.asList(new HashSet<>(), new HashSet<>()));
+        List<Set<String>> allow = new ArrayList<>(
+            Arrays.asList(new HashSet<>(), new HashSet<>()));
+
+        boolean hasWildCardAllow = false;
+
+        for (AclBinding binding : acls(aclFilter)) {
+            if 
(!binding.entry().host().equals(requestContext.clientAddress().getHostAddress())
+                    && !binding.entry().host().equals("*"))
+                continue;
+
+            KafkaPrincipal principal = new KafkaPrincipal(
+                requestContext.principal().getPrincipalType(),
+                requestContext.principal().getName());
+
+            if 
(!SecurityUtils.parseKafkaPrincipal(binding.entry().principal()).equals(principal)
+                    && !binding.entry().principal().equals("User:*"))
+                continue;
+
+            if (binding.entry().operation() != op
+                    && binding.entry().operation() != AclOperation.ALL)
+                continue;
+
+            if (binding.entry().permissionType() == AclPermissionType.DENY) {
+                switch (binding.pattern().patternType()) {
+                    case LITERAL:
+                        if 
(binding.pattern().name().equals(ResourcePattern.WILDCARD_RESOURCE))
+                            return AuthorizationResult.DENIED;
+                        deny.get(typeLiteral).add(binding.pattern().name());
+                        break;
+                    case PREFIXED:
+                        deny.get(typePrefix).add(binding.pattern().name());
+                        break;
+                }
+                continue;
+            }
+
+            if (binding.entry().permissionType() != AclPermissionType.ALLOW)
+                continue;
+
+            switch (binding.pattern().patternType()) {
+                case LITERAL:
+                    if 
(binding.pattern().name().equals(ResourcePattern.WILDCARD_RESOURCE)) {
+                        hasWildCardAllow = true;
+                        continue;
+                    }
+                    allow.get(typeLiteral).add(binding.pattern().name());
+                    break;
+                case PREFIXED:
+                    allow.get(typePrefix).add(binding.pattern().name());
+                    break;
+            }
+        }
+
+        if (hasWildCardAllow) {
+            return AuthorizationResult.ALLOWED;
+        }
+
+        for (int allowType : Arrays.asList(typePrefix, typeLiteral)) {

Review comment:
       EnumMap make sense. commit 1a139ce744a279e4424188008ee5158186b0fcbe




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to