mumrah commented on code in PR #12636: URL: https://github.com/apache/kafka/pull/12636#discussion_r970977191
########## metadata/src/main/java/org/apache/kafka/metadata/authorizer/StandardAuthorizerData.java: ########## @@ -397,18 +387,34 @@ private MatchingAclRule findAclRule( return matchingAclBuilder.build(); } + /** + * Use a binary search to find the index of the first ACL which is greater than or + * equal to the given ACL. This may be equal to the end of the array if there are + * no such ACLs. + */ + private int indexOfFirstAclGreaterThanOrEqualTo(StandardAcl exemplar) { + int i = Arrays.binarySearch(acls, + new StandardAclWithId(Uuid.ZERO_UUID, exemplar), + StandardAclWithId.ACL_COMPARATOR); + // Arrays.binarySearch returns a positive number if it found an exact match, and + // a negative number otherwise. Review Comment: Might comment about what the negative return value indicates. It helps L404 make more sense :) ########## metadata/src/main/java/org/apache/kafka/metadata/authorizer/StandardAuthorizerData.java: ########## @@ -109,14 +110,9 @@ public class StandardAuthorizerData { private final DefaultRule defaultRule; /** - * Contains all of the current ACLs sorted by (resource type, resource name). + * An immutable array of all the current ACLs sorted by (resource type, resource name). */ - private final ConcurrentSkipListSet<StandardAcl> aclsByResource; Review Comment: I see we are replacing the skip-list set with a sorted array. Don't we need to guard against duplicates in the array? If we used a TreeSet here, it would be closer to the current implementation and I think it should have linear time when copying from another TreeSet ########## metadata/src/main/java/org/apache/kafka/metadata/authorizer/StandardAuthorizer.java: ########## @@ -96,18 +97,16 @@ public void completeInitialLoad(Exception e) { } @Override - public void addAcl(Uuid id, StandardAcl acl) { - data.addAcl(id, acl); - } - - @Override - public void removeAcl(Uuid id) { - data.removeAcl(id); + public synchronized void loadSnapshot(Map<Uuid, StandardAcl> acls) { + data = data.copyWithAllNewAcls(acls.entrySet()); } @Override - public synchronized void loadSnapshot(Map<Uuid, StandardAcl> acls) { - data = data.copyWithNewAcls(acls.entrySet()); + public synchronized void applyAclChanges( + Collection<Map.Entry<Uuid, StandardAcl>> newAcls, Review Comment: Could we just take a `Collection<StandardAcl>` since the acl has the ID as a property? ########## metadata/src/main/java/org/apache/kafka/metadata/authorizer/StandardAuthorizerData.java: ########## @@ -109,14 +110,9 @@ public class StandardAuthorizerData { private final DefaultRule defaultRule; /** - * Contains all of the current ACLs sorted by (resource type, resource name). + * An immutable array of all the current ACLs sorted by (resource type, resource name). */ - private final ConcurrentSkipListSet<StandardAcl> aclsByResource; - - /** - * Contains all of the current ACLs indexed by UUID. - */ - private final ConcurrentHashMap<Uuid, StandardAcl> aclsById; Review Comment: Guess we don't need this anymore if we are removing `addAcl` and `removeAcl`? ########## metadata/src/main/java/org/apache/kafka/metadata/authorizer/StandardAuthorizerData.java: ########## @@ -182,59 +173,58 @@ StandardAuthorizerData copyWithNewConfig(int nodeId, loadingComplete, newSuperUsers, newDefaultResult, - aclsByResource, - aclsById); + acls); } - StandardAuthorizerData copyWithNewAcls(Collection<Entry<Uuid, StandardAcl>> aclEntries) { - StandardAuthorizerData newData = new StandardAuthorizerData( - log, - aclMutator, - loadingComplete, - superUsers, - defaultRule.result, - new ConcurrentSkipListSet<>(), - new ConcurrentHashMap<>()); - for (Entry<Uuid, StandardAcl> entry : aclEntries) { - newData.addAcl(entry.getKey(), entry.getValue()); - } - log.info("Applied {} acl(s) from image.", aclEntries.size()); - return newData; + StandardAuthorizerData copyWithAllNewAcls( + Collection<Entry<Uuid, StandardAcl>> newAclEntries + ) { + return copyWithNewAcls(EMPTY_ACLS, newAclEntries, Collections.emptySet()); } - void addAcl(Uuid id, StandardAcl acl) { - try { - StandardAcl prevAcl = aclsById.putIfAbsent(id, acl); - if (prevAcl != null) { - throw new RuntimeException("An ACL with ID " + id + " already exists."); - } - if (!aclsByResource.add(acl)) { - aclsById.remove(id); - throw new RuntimeException("Unable to add the ACL with ID " + id + - " to aclsByResource"); - } - log.trace("Added ACL {}: {}", id, acl); - } catch (Throwable e) { - log.error("addAcl error", e); - throw e; - } + StandardAuthorizerData copyWithAclChanges( + Collection<Entry<Uuid, StandardAcl>> newAclEntries, + Set<Uuid> removedAclIds + ) { + return copyWithNewAcls(acls, newAclEntries, removedAclIds); } - void removeAcl(Uuid id) { - try { - StandardAcl acl = aclsById.remove(id); - if (acl == null) { - throw new RuntimeException("ID " + id + " not found in aclsById."); + StandardAuthorizerData copyWithNewAcls( + StandardAclWithId[] existingAcls, + Collection<Entry<Uuid, StandardAcl>> newAclEntries, + Set<Uuid> removedAclIds + ) { + StandardAclWithId[] newAcls = new StandardAclWithId[ + existingAcls.length + newAclEntries.size() - removedAclIds.size()]; Review Comment: nit: pull the size into a local so we don't break the line on the array brackets -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org