akhileshchg commented on code in PR #12636: URL: https://github.com/apache/kafka/pull/12636#discussion_r971088491
########## metadata/src/main/java/org/apache/kafka/metadata/authorizer/StandardAuthorizerData.java: ########## @@ -109,14 +110,9 @@ public class StandardAuthorizerData { private final DefaultRule defaultRule; /** - * Contains all of the current ACLs sorted by (resource type, resource name). + * An immutable array of all the current ACLs sorted by (resource type, resource name). */ - private final ConcurrentSkipListSet<StandardAcl> aclsByResource; Review Comment: We're storing `Id` with the `StandardAcl`. Shouldn't that make it unique? I think since it is sorted, we can maybe have a conservative check to make sure there are no duplicate ids. ########## metadata/src/main/java/org/apache/kafka/metadata/authorizer/StandardAuthorizer.java: ########## @@ -96,18 +97,16 @@ public void completeInitialLoad(Exception e) { } @Override - public void addAcl(Uuid id, StandardAcl acl) { - data.addAcl(id, acl); - } - - @Override - public void removeAcl(Uuid id) { - data.removeAcl(id); + public synchronized void loadSnapshot(Map<Uuid, StandardAcl> acls) { + data = data.copyWithAllNewAcls(acls.entrySet()); } @Override - public synchronized void loadSnapshot(Map<Uuid, StandardAcl> acls) { - data = data.copyWithNewAcls(acls.entrySet()); + public synchronized void applyAclChanges( + Collection<Map.Entry<Uuid, StandardAcl>> newAcls, Review Comment: I don't think `StandardAcl` has id with it. We have a different data structure for it`StandardAclWithId` ########## metadata/src/main/java/org/apache/kafka/metadata/authorizer/StandardAuthorizer.java: ########## @@ -138,6 +139,15 @@ public List<AuthorizationResult> authorize( return results; } Review Comment: In `authorize` function we still do `StandardAuthorizerData curData = data;`. I don't think we need to do this anymore. ########## metadata/src/main/java/org/apache/kafka/metadata/authorizer/StandardAuthorizerData.java: ########## @@ -182,59 +174,58 @@ StandardAuthorizerData copyWithNewConfig(int nodeId, loadingComplete, newSuperUsers, newDefaultResult, - aclsByResource, - aclsById); + acls); } - StandardAuthorizerData copyWithNewAcls(Collection<Entry<Uuid, StandardAcl>> aclEntries) { - StandardAuthorizerData newData = new StandardAuthorizerData( - log, - aclMutator, - loadingComplete, - superUsers, - defaultRule.result, - new ConcurrentSkipListSet<>(), - new ConcurrentHashMap<>()); - for (Entry<Uuid, StandardAcl> entry : aclEntries) { - newData.addAcl(entry.getKey(), entry.getValue()); - } - log.info("Applied {} acl(s) from image.", aclEntries.size()); - return newData; + StandardAuthorizerData copyWithAllNewAcls( + Collection<Entry<Uuid, StandardAcl>> newAclEntries + ) { + return copyWithNewAcls(EMPTY_ACLS, newAclEntries, Collections.emptySet()); } - void addAcl(Uuid id, StandardAcl acl) { - try { - StandardAcl prevAcl = aclsById.putIfAbsent(id, acl); - if (prevAcl != null) { - throw new RuntimeException("An ACL with ID " + id + " already exists."); - } - if (!aclsByResource.add(acl)) { - aclsById.remove(id); - throw new RuntimeException("Unable to add the ACL with ID " + id + - " to aclsByResource"); - } - log.trace("Added ACL {}: {}", id, acl); - } catch (Throwable e) { - log.error("addAcl error", e); - throw e; - } + StandardAuthorizerData copyWithAclChanges( + Collection<Entry<Uuid, StandardAcl>> newAclEntries, + Set<Uuid> removedAclIds + ) { + return copyWithNewAcls(acls, newAclEntries, removedAclIds); } - void removeAcl(Uuid id) { - try { - StandardAcl acl = aclsById.remove(id); - if (acl == null) { - throw new RuntimeException("ID " + id + " not found in aclsById."); + StandardAuthorizerData copyWithNewAcls( + StandardAclWithId[] existingAcls, + Collection<Entry<Uuid, StandardAcl>> newAclEntries, + Set<Uuid> removedAclIds + ) { + StandardAclWithId[] newAcls = new StandardAclWithId[ + existingAcls.length + newAclEntries.size() - removedAclIds.size()]; + int numRemoved = 0, j = 0; + for (int i = 0; i < existingAcls.length; i++) { + StandardAclWithId aclWithId = existingAcls[i]; + if (removedAclIds.contains(aclWithId.id())) { + numRemoved++; + } else { + newAcls[j++] = aclWithId; } - if (!aclsByResource.remove(acl)) { - throw new RuntimeException("Unable to remove the ACL with ID " + id + - " from aclsByResource"); + } + if (numRemoved < removedAclIds.size()) { + throw new RuntimeException("Only located " + numRemoved + " out of " + + removedAclIds.size() + " removed ACL ID(s). removedAclIds = " + + removedAclIds.stream().map(a -> a.toString()).collect(Collectors.joining(", "))); + } + if (!newAclEntries.isEmpty()) { + int i = 0; + for (Entry<Uuid, StandardAcl> entry : newAclEntries) { + newAcls[existingAcls.length + i] = new StandardAclWithId(entry.getKey(), entry.getValue()); + i++; } Review Comment: Compared to the previous code, we're missing the check for duplicate ids. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org