mumrah commented on code in PR #12628: URL: https://github.com/apache/kafka/pull/12628#discussion_r975497294
########## metadata/src/main/java/org/apache/kafka/metadata/authorizer/StandardAuthorizerData.java: ########## @@ -530,54 +545,14 @@ static AuthorizationResult findResult(Action action, } Iterable<AclBinding> acls(AclBindingFilter filter) { Review Comment: Let's add a javadoc to this method indicating that we are returning a copy of the AclBinding-s ########## metadata/src/main/java/org/apache/kafka/metadata/authorizer/StandardAuthorizerData.java: ########## @@ -193,15 +194,29 @@ StandardAuthorizerData copyWithNewAcls(Collection<Entry<Uuid, StandardAcl>> aclE loadingComplete, superUsers, defaultRule.result, - new ConcurrentSkipListSet<>(), - new ConcurrentHashMap<>()); + new TreeSet<>(), + new HashMap<>()); for (Entry<Uuid, StandardAcl> entry : aclEntries) { newData.addAcl(entry.getKey(), entry.getValue()); } log.info("Applied {} acl(s) from image.", aclEntries.size()); return newData; } + StandardAuthorizerData copyWithNewAcls(TreeSet<StandardAcl> aclsByResource, HashMap<Uuid, Review Comment: Is the other `copyWithNewAcls` still used? Can we remove it? ########## metadata/src/main/java/org/apache/kafka/metadata/authorizer/StandardAuthorizer.java: ########## @@ -58,28 +59,46 @@ public class StandardAuthorizer implements ClusterMetadataAuthorizer { */ private final CompletableFuture<Void> initialLoadFuture = new CompletableFuture<>(); + private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock(); + /** - * The current data. Can be read without a lock. Must be written while holding the object lock. + * The current data. We use a read-write lock to synchronize reads and writes to the data. Review Comment: Let's mention that we expect a single writer and multiple readers for this class and that the locks are here to ensure consistency (i.e., the issue you found) ########## metadata/src/main/java/org/apache/kafka/metadata/authorizer/StandardAuthorizer.java: ########## @@ -129,23 +167,37 @@ public synchronized void loadSnapshot(Map<Uuid, StandardAcl> acls) { public List<AuthorizationResult> authorize( AuthorizableRequestContext requestContext, List<Action> actions) { - StandardAuthorizerData curData = data; List<AuthorizationResult> results = new ArrayList<>(actions.size()); - for (Action action: actions) { - AuthorizationResult result = curData.authorize(requestContext, action); - results.add(result); + lock.readLock().lock(); + try { + for (Action action : actions) { + AuthorizationResult result = data.authorize(requestContext, action); + results.add(result); + } + } finally { + lock.readLock().unlock(); } return results; } @Override public Iterable<AclBinding> acls(AclBindingFilter filter) { - return data.acls(filter); + lock.readLock().lock(); + try { + return data.acls(filter); Review Comment: Can you add a comment here that the returned `Iterable` is consistent? ########## metadata/src/main/java/org/apache/kafka/metadata/authorizer/StandardAuthorizer.java: ########## @@ -129,23 +167,37 @@ public synchronized void loadSnapshot(Map<Uuid, StandardAcl> acls) { public List<AuthorizationResult> authorize( AuthorizableRequestContext requestContext, List<Action> actions) { - StandardAuthorizerData curData = data; List<AuthorizationResult> results = new ArrayList<>(actions.size()); - for (Action action: actions) { - AuthorizationResult result = curData.authorize(requestContext, action); - results.add(result); + lock.readLock().lock(); + try { + for (Action action : actions) { + AuthorizationResult result = data.authorize(requestContext, action); Review Comment: Even in the lock, we should still load `data` into a local variable to avoid loading the volatile multiple times -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org