mumrah commented on code in PR #12628:
URL: https://github.com/apache/kafka/pull/12628#discussion_r975497294


##########
metadata/src/main/java/org/apache/kafka/metadata/authorizer/StandardAuthorizerData.java:
##########
@@ -530,54 +545,14 @@ static AuthorizationResult findResult(Action action,
     }
 
     Iterable<AclBinding> acls(AclBindingFilter filter) {

Review Comment:
   Let's add a javadoc to this method indicating that we are returning a copy 
of the AclBinding-s



##########
metadata/src/main/java/org/apache/kafka/metadata/authorizer/StandardAuthorizerData.java:
##########
@@ -193,15 +194,29 @@ StandardAuthorizerData 
copyWithNewAcls(Collection<Entry<Uuid, StandardAcl>> aclE
             loadingComplete,
             superUsers,
             defaultRule.result,
-            new ConcurrentSkipListSet<>(),
-            new ConcurrentHashMap<>());
+            new TreeSet<>(),
+            new HashMap<>());
         for (Entry<Uuid, StandardAcl> entry : aclEntries) {
             newData.addAcl(entry.getKey(), entry.getValue());
         }
         log.info("Applied {} acl(s) from image.", aclEntries.size());
         return newData;
     }
 
+    StandardAuthorizerData copyWithNewAcls(TreeSet<StandardAcl> 
aclsByResource, HashMap<Uuid,

Review Comment:
   Is the other `copyWithNewAcls` still used? Can we remove it?



##########
metadata/src/main/java/org/apache/kafka/metadata/authorizer/StandardAuthorizer.java:
##########
@@ -58,28 +59,46 @@ public class StandardAuthorizer implements 
ClusterMetadataAuthorizer {
      */
     private final CompletableFuture<Void> initialLoadFuture = new 
CompletableFuture<>();
 
+    private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
+
     /**
-     * The current data. Can be read without a lock. Must be written while 
holding the object lock.
+     * The current data. We use a read-write lock to synchronize reads and 
writes to the data.

Review Comment:
   Let's mention that we expect a single writer and multiple readers for this 
class and that the locks are here to ensure consistency (i.e., the issue you 
found)



##########
metadata/src/main/java/org/apache/kafka/metadata/authorizer/StandardAuthorizer.java:
##########
@@ -129,23 +167,37 @@ public synchronized void loadSnapshot(Map<Uuid, 
StandardAcl> acls) {
     public List<AuthorizationResult> authorize(
             AuthorizableRequestContext requestContext,
             List<Action> actions) {
-        StandardAuthorizerData curData = data;
         List<AuthorizationResult> results = new ArrayList<>(actions.size());
-        for (Action action: actions) {
-            AuthorizationResult result = curData.authorize(requestContext, 
action);
-            results.add(result);
+        lock.readLock().lock();
+        try {
+            for (Action action : actions) {
+                AuthorizationResult result = data.authorize(requestContext, 
action);
+                results.add(result);
+            }
+        } finally {
+            lock.readLock().unlock();
         }
         return results;
     }
 
     @Override
     public Iterable<AclBinding> acls(AclBindingFilter filter) {
-        return data.acls(filter);
+        lock.readLock().lock();
+        try {
+            return data.acls(filter);

Review Comment:
   Can you add a comment here that the returned `Iterable` is consistent?



##########
metadata/src/main/java/org/apache/kafka/metadata/authorizer/StandardAuthorizer.java:
##########
@@ -129,23 +167,37 @@ public synchronized void loadSnapshot(Map<Uuid, 
StandardAcl> acls) {
     public List<AuthorizationResult> authorize(
             AuthorizableRequestContext requestContext,
             List<Action> actions) {
-        StandardAuthorizerData curData = data;
         List<AuthorizationResult> results = new ArrayList<>(actions.size());
-        for (Action action: actions) {
-            AuthorizationResult result = curData.authorize(requestContext, 
action);
-            results.add(result);
+        lock.readLock().lock();
+        try {
+            for (Action action : actions) {
+                AuthorizationResult result = data.authorize(requestContext, 
action);

Review Comment:
   Even in the lock, we should still load `data` into a local variable to avoid 
loading the volatile multiple times



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to