cmccabe commented on a change in pull request #11649:
URL: https://github.com/apache/kafka/pull/11649#discussion_r802165841



##########
File path: 
metadata/src/main/java/org/apache/kafka/metadata/authorizer/StandardAuthorizerData.java
##########
@@ -0,0 +1,454 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.metadata.authorizer;
+
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.acl.AclBinding;
+import org.apache.kafka.common.acl.AclBindingFilter;
+import org.apache.kafka.common.acl.AclOperation;
+import org.apache.kafka.common.acl.AclPermissionType;
+import org.apache.kafka.common.resource.PatternType;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.server.authorizer.Action;
+import org.apache.kafka.server.authorizer.AuthorizableRequestContext;
+import org.apache.kafka.server.authorizer.AuthorizationResult;
+import org.slf4j.Logger;
+
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.Map.Entry;
+import java.util.NavigableSet;
+import java.util.NoSuchElementException;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentSkipListSet;
+
+import static org.apache.kafka.common.acl.AclOperation.ALTER;
+import static org.apache.kafka.common.acl.AclOperation.ALTER_CONFIGS;
+import static org.apache.kafka.common.acl.AclOperation.DELETE;
+import static org.apache.kafka.common.acl.AclOperation.DESCRIBE;
+import static org.apache.kafka.common.acl.AclOperation.DESCRIBE_CONFIGS;
+import static org.apache.kafka.common.acl.AclOperation.READ;
+import static org.apache.kafka.common.acl.AclOperation.WRITE;
+import static org.apache.kafka.common.acl.AclPermissionType.ALLOW;
+import static org.apache.kafka.common.resource.PatternType.LITERAL;
+import static org.apache.kafka.server.authorizer.AuthorizationResult.ALLOWED;
+import static org.apache.kafka.server.authorizer.AuthorizationResult.DENIED;
+
+
+/**
+ * A class which encapsulates the configuration and the ACL data owned by 
StandardAuthorizer.
+ *
+ * The methods in this class support lockless concurrent access.
+ */
+public class StandardAuthorizerData {
+    /**
+     * The host or name string used in ACLs that match any host or name.
+     */
+    public static final String WILDCARD = "*";
+
+    /**
+     * The principal entry used in ACLs that match any principal.
+     */
+    public static final String WILDCARD_PRINCIPAL = "User:*";
+
+    /**
+     * The logger to use.
+     */
+    final Logger log;
+
+    /**
+     * The current AclMutator.
+     */
+    final AclMutator aclMutator;
+
+    /**
+     * A statically configured set of users that are authorized to do anything.
+     */
+    private final Set<String> superUsers;
+
+    /**
+     * The result to return if no ACLs match.
+     */
+    private final AuthorizationResult defaultResult;
+
+    /**
+     * Contains all of the current ACLs sorted by (resource type, pattern 
type, resource name).
+     */
+    private final ConcurrentSkipListSet<StandardAcl> aclsByResource;
+
+    /**
+     * Contains all of the current ACLs indexed by UUID.
+     */
+    private final ConcurrentHashMap<Uuid, StandardAcl> aclsById;
+
+    private static Logger createLogger(int nodeId) {
+        return new LogContext("[StandardAuthorizer " + nodeId + "] 
").logger(StandardAuthorizerData.class);
+    }
+
+    static StandardAuthorizerData createEmpty() {
+        return new StandardAuthorizerData(createLogger(-1),
+            null,
+            Collections.emptySet(),
+            DENIED,
+            new ConcurrentSkipListSet<>(), new ConcurrentHashMap<>());
+    }
+
+    private StandardAuthorizerData(Logger log,
+                                   AclMutator aclMutator,
+                                   Set<String> superUsers,
+                                   AuthorizationResult defaultResult,
+                                   ConcurrentSkipListSet<StandardAcl> 
aclsByResource,
+                                   ConcurrentHashMap<Uuid, StandardAcl> 
aclsById) {
+        this.log = log;
+        this.aclMutator = aclMutator;
+        this.superUsers = superUsers;
+        this.defaultResult = defaultResult;
+        this.aclsByResource = aclsByResource;
+        this.aclsById = aclsById;
+    }
+
+    StandardAuthorizerData copyWithNewAclMutator(AclMutator newAclMutator) {
+        return new StandardAuthorizerData(log,
+            newAclMutator,
+            superUsers,
+            defaultResult,
+            aclsByResource,
+            aclsById);
+    }
+
+    StandardAuthorizerData copyWithNewConfig(int nodeId,
+                                             Set<String> newSuperUsers,
+                                             AuthorizationResult 
newDefaultResult) {
+        return new StandardAuthorizerData(
+            createLogger(nodeId),
+            aclMutator,
+            newSuperUsers,
+            newDefaultResult,
+            aclsByResource,
+            aclsById);
+    }
+
+    StandardAuthorizerData copyWithNewAcls(Collection<Entry<Uuid, 
StandardAcl>> aclEntries) {
+        StandardAuthorizerData newData = new StandardAuthorizerData(
+            log,
+            aclMutator,
+            superUsers,
+            defaultResult,
+            new ConcurrentSkipListSet<>(),
+            new ConcurrentHashMap<>());
+        for (Entry<Uuid, StandardAcl> entry : aclEntries) {
+            newData.addAcl(entry.getKey(), entry.getValue());
+        }
+        log.info("Applied " + aclEntries.size() + "acl(s) from image.");
+        return newData;
+    }
+
+    void addAcl(Uuid id, StandardAcl acl) {
+        try {
+            StandardAcl prevAcl = aclsById.putIfAbsent(id, acl);
+            if (prevAcl != null) {
+                throw new RuntimeException("An ACL with ID " + id + " already 
exists.");
+            }
+            if (!aclsByResource.add(acl)) {
+                aclsById.remove(id);
+                throw new RuntimeException("Unable to add the ACL with ID " + 
id +
+                    " to aclsByResource");
+            }
+            if (log.isTraceEnabled()) {
+                log.trace("Added ACL " + id + ": " + acl);
+            }
+        } catch (Throwable e) {
+            log.error("addAcl error", e);
+            throw e;
+        }
+    }
+
+    void removeAcl(Uuid id) {
+        try {
+            StandardAcl acl = aclsById.remove(id);
+            if (acl == null) {
+                throw new RuntimeException("ID " + id + " not found in 
aclsById.");
+            }
+            if (!aclsByResource.remove(acl)) {
+                throw new RuntimeException("Unable to remove the ACL with ID " 
+ id +
+                    " from aclsByResource");
+            }
+            if (log.isTraceEnabled()) {
+                log.trace("Removed ACL " + id + ": " + acl);
+            }
+        } catch (Throwable e) {
+            log.error("removeAcl error", e);
+            throw e;
+        }
+    }
+
+    Set<String> superUsers() {
+        return superUsers;
+    }
+
+    AuthorizationResult defaultResult() {
+        return defaultResult;
+    }
+
+    int aclCount() {
+        return aclsById.size();
+    }
+
+    static class AuthorizationResultBuilder {
+        boolean foundDeny = false;
+        boolean foundAllow = false;
+    }
+
+    /**
+     * Authorize an action based on the current set of ACLs.
+     *
+     * In order to know whether to allow or deny the action, we need to 
examine the ACLs
+     * that apply to it. If any DENY ACLs match, the operation is denied, no 
matter how
+     * many ALLOW ACLs match. If neither ALLOW nor DENY ACLs match, we return 
the default
+     * result. In general it makes more sense to configure the default result 
to be
+     * DENY, but some people (and unit tests) configure it as ALLOW.
+     */
+    AuthorizationResult authorize(AuthorizableRequestContext requestContext,
+                                  Action action) {
+        // Superusers are authorized to do anything.
+        if (superUsers.contains(requestContext.principal().toString())) {
+            if (log.isTraceEnabled()) {
+                log.trace("authorize(requestContext=" + requestContext + ", 
action=" + action +
+                    "): ALLOWED because " + 
requestContext.principal().toString() +
+                    " is a superuser");
+            }
+            return ALLOWED;
+        }
+
+        // This code relies on the ordering of StandardAcl within the 
NavigableMap.
+        // Entries are sorted by resource type first, then REVERSE resource 
name.
+        // Therefore, we can find all the applicable ACLs by starting at
+        // (resource_type, resource_name) and stepping forwards until we reach 
an ACL with
+        // a resource name which is not a prefix of the current one.
+        //
+        // For example, when trying to authorize a TOPIC resource named 
foobar, we would
+        // start at element 2, and continue on to 3 and 4 following map:
+        //
+        // 1. rs=TOPIC rn=gar pt=PREFIX
+        // 2. rs=TOPIC rn=foobar pt=PREFIX
+        // 3. rs=TOPIC rn=foob pt=LITERAL
+        // 4. rs=TOPIC rn=foo pt=PREFIX
+        // 5. rs=TOPIC rn=eeee pt=LITERAL
+        //
+        // Once we reached element 5, we would stop scanning.
+        AuthorizationResultBuilder builder = new AuthorizationResultBuilder();
+        StandardAcl exemplar = new StandardAcl(
+            action.resourcePattern().resourceType(),
+            action.resourcePattern().name(),
+            PatternType.UNKNOWN,

Review comment:
       Good point. I will add a comment.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to