emissionnebula commented on code in PR #13437: URL: https://github.com/apache/kafka/pull/13437#discussion_r1177916880
########## metadata/src/main/java/org/apache/kafka/metadata/authorizer/AclCache.java: ########## @@ -0,0 +1,107 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.metadata.authorizer; + +import org.apache.kafka.common.Uuid; +import org.apache.kafka.common.acl.AclBinding; +import org.apache.kafka.common.acl.AclBindingFilter; +import org.apache.kafka.server.immutable.ImmutableMap; +import org.apache.kafka.server.immutable.ImmutableNavigableSet; + +import java.util.ArrayList; +import java.util.List; + +/** + * An immutable class that stores the ACLs in KRaft-based clusters. + */ +public class AclCache { + /** + * Contains all of the current ACLs sorted by (resource type, resource name). + */ + private final ImmutableNavigableSet<StandardAcl> aclsByResource; + + /** + * Contains all of the current ACLs indexed by UUID. + */ + private final ImmutableMap<Uuid, StandardAcl> aclsById; + + AclCache() { + this(ImmutableNavigableSet.empty(), ImmutableMap.empty()); + } + + private AclCache(final ImmutableNavigableSet<StandardAcl> aclsByResource, final ImmutableMap<Uuid, StandardAcl> aclsById) { + this.aclsByResource = aclsByResource; + this.aclsById = aclsById; + } + + public ImmutableNavigableSet<StandardAcl> aclsByResource() { + return aclsByResource; + } + + Iterable<AclBinding> acls(AclBindingFilter filter) { + List<AclBinding> aclBindingList = new ArrayList<>(); + aclsByResource.forEach(acl -> { + AclBinding aclBinding = acl.toBinding(); + if (filter.matches(aclBinding)) { + aclBindingList.add(aclBinding); + } + }); + return aclBindingList; + } + + int count() { + return aclsById.size(); + } + + StandardAcl getAcl(Uuid id) { + return aclsById.get(id); + } + + AclCache addAcl(Uuid id, StandardAcl acl) { Review Comment: > > _Since writes are done on a single thread, the only case of concurrency we have to solve here is when multiple reads and a single write are happening in parallel._ > > Do I get this right that the single writer assumption stated in the PR description is critical to achieve consistency in the sequence of operations below? (e.g. that the state checked line 77 is still valid line 81). Should multiple writes happen concurrently, this would not be the case, right? Is there a way to enforce the single writer condition? Or, shouldn't the cache preserve consistency under multiple writers (since it has no control over how many actors can update its state concurrently)? Thanks @Hangleton for the comment. This condition of single write will always be true for Authorizer because we have to apply the ACL changes in the order of their arrival. In case of Kraft, that order will be the order in which it is written to metadata topic. So we would never enable multiple threads to read from the metadata topic and write to AclCache. Due to this I didn't add a lock on writes here. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org