This is an automated email from the ASF dual-hosted git repository.

cmccabe pushed a commit to branch 3.9
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/3.9 by this push:
     new 713db446b0c KAFKA-19411: Fix deleteAcls bug which allows more 
deletions than max records per user op (#19974)
713db446b0c is described below

commit 713db446b0cbb598247bd442e7a76d38b59d99ca
Author: Alyssa Huang <[email protected]>
AuthorDate: Tue Jun 24 11:24:50 2025 -0700

    KAFKA-19411: Fix deleteAcls bug which allows more deletions than max 
records per user op (#19974)
    
    If there are more deletion filters after we initially hit the
    `MAX_RECORDS_PER_USER_OP` bound, we will add an additional deletion
    record ontop of that for each additional filter.
    
    The current error message returned to the client is not useful either,
    adding logic so client doesn't just get `UNKNOWN_SERVER_EXCEPTION` with
    no details returned.
    
    Conflicts:
    - In AclControlManagerTest.java, use !isPresent instead of isEmpty so that 
this will work with java 8.
---
 .../apache/kafka/controller/AclControlManager.java | 13 +++--
 .../kafka/controller/AclControlManagerTest.java    | 58 ++++++++++++++++++++++
 2 files changed, 67 insertions(+), 4 deletions(-)

diff --git 
a/metadata/src/main/java/org/apache/kafka/controller/AclControlManager.java 
b/metadata/src/main/java/org/apache/kafka/controller/AclControlManager.java
index ca324ab5788..47719734287 100644
--- a/metadata/src/main/java/org/apache/kafka/controller/AclControlManager.java
+++ b/metadata/src/main/java/org/apache/kafka/controller/AclControlManager.java
@@ -176,6 +176,10 @@ public class AclControlManager {
                 validateFilter(filter);
                 AclDeleteResult result = deleteAclsForFilter(filter, records);
                 results.add(result);
+            } catch (BoundedListTooLongException e) {
+                // we do not return partial results here because the fact that 
only a portion of the deletions
+                // succeeded can be easily missed due to response size. 
instead fail the entire response
+                throw new InvalidRequestException(e.getMessage(), e);
             } catch (Throwable e) {
                 results.add(new 
AclDeleteResult(ApiError.fromThrowable(e).exception()));
             }
@@ -191,13 +195,14 @@ public class AclControlManager {
             StandardAcl acl = entry.getValue();
             AclBinding binding = acl.toBinding();
             if (filter.matches(binding)) {
-                deleted.add(new AclBindingDeleteResult(binding));
-                records.add(new ApiMessageAndVersion(
-                    new RemoveAccessControlEntryRecord().setId(id), (short) 
0));
-                if (records.size() > MAX_RECORDS_PER_USER_OP) {
+                // check size limitation first before adding additional records
+                if (records.size() >= MAX_RECORDS_PER_USER_OP) {
                     throw new BoundedListTooLongException("Cannot remove more 
than " +
                         MAX_RECORDS_PER_USER_OP + " acls in a single delete 
operation.");
                 }
+                deleted.add(new AclBindingDeleteResult(binding));
+                records.add(new ApiMessageAndVersion(
+                    new RemoveAccessControlEntryRecord().setId(id), (short) 
0));
             }
         }
         return new AclDeleteResult(deleted);
diff --git 
a/metadata/src/test/java/org/apache/kafka/controller/AclControlManagerTest.java 
b/metadata/src/test/java/org/apache/kafka/controller/AclControlManagerTest.java
index b4b2089528f..487ae2d6cef 100644
--- 
a/metadata/src/test/java/org/apache/kafka/controller/AclControlManagerTest.java
+++ 
b/metadata/src/test/java/org/apache/kafka/controller/AclControlManagerTest.java
@@ -48,6 +48,7 @@ import 
org.apache.kafka.server.authorizer.AuthorizableRequestContext;
 import org.apache.kafka.server.authorizer.AuthorizationResult;
 import org.apache.kafka.server.authorizer.AuthorizerServerInfo;
 import org.apache.kafka.server.common.ApiMessageAndVersion;
+import org.apache.kafka.server.mutable.BoundedListTooLongException;
 import org.apache.kafka.timeline.SnapshotRegistry;
 
 import org.junit.jupiter.api.Test;
@@ -71,6 +72,7 @@ import static 
org.apache.kafka.common.acl.AclPermissionType.ALLOW;
 import static org.apache.kafka.common.resource.PatternType.LITERAL;
 import static org.apache.kafka.common.resource.PatternType.MATCH;
 import static org.apache.kafka.common.resource.ResourceType.TOPIC;
+import static 
org.apache.kafka.controller.QuorumController.MAX_RECORDS_PER_USER_OP;
 import static 
org.apache.kafka.metadata.authorizer.StandardAclWithIdTest.TEST_ACLS;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertFalse;
@@ -366,4 +368,60 @@ public class AclControlManagerTest {
         assertEquals(id, ((RemoveAccessControlEntryRecord) 
deleteAclResultsBothFilters.records().get(0).message()).id());
         assertEquals(2, deleteAclResultsBothFilters.response().size());
     }
+
+    @Test
+    public void testDeleteExceedsMaxRecords() {
+        AclControlManager manager = new AclControlManager.Builder().build();
+        MockClusterMetadataAuthorizer authorizer = new 
MockClusterMetadataAuthorizer();
+        authorizer.loadSnapshot(manager.idToAcl());
+
+        List<AclBinding> firstCreate = new ArrayList<>();
+        List<AclBinding> secondCreate = new ArrayList<>();
+
+        // create MAX_RECORDS_PER_USER_OP + 2 ACLs
+        for (int i = 0; i < MAX_RECORDS_PER_USER_OP + 2; i++) {
+            StandardAclWithId acl = new StandardAclWithId(Uuid.randomUuid(),
+                new StandardAcl(
+                    ResourceType.TOPIC,
+                    "mytopic_" + i,
+                    PatternType.LITERAL,
+                    "User:alice",
+                    "127.0.0.1",
+                    AclOperation.READ,
+                    AclPermissionType.ALLOW));
+
+            // split acl creations between two create requests
+            if (i % 2 == 0) {
+                firstCreate.add(acl.toBinding());
+            } else {
+                secondCreate.add(acl.toBinding());
+            }
+        }
+        ControllerResult<List<AclCreateResult>> firstCreateResult = 
manager.createAcls(firstCreate);
+        assertEquals((MAX_RECORDS_PER_USER_OP / 2) + 1, 
firstCreateResult.response().size());
+        for (AclCreateResult result : firstCreateResult.response()) {
+            assertTrue(!result.exception().isPresent());
+        }
+
+        ControllerResult<List<AclCreateResult>> secondCreateResult = 
manager.createAcls(secondCreate);
+        assertEquals((MAX_RECORDS_PER_USER_OP / 2) + 1, 
secondCreateResult.response().size());
+        for (AclCreateResult result : secondCreateResult.response()) {
+            assertTrue(!result.exception().isPresent());
+        }
+
+        RecordTestUtils.replayAll(manager, firstCreateResult.records());
+        RecordTestUtils.replayAll(manager, secondCreateResult.records());
+        assertFalse(manager.idToAcl().isEmpty());
+
+        ArrayList<AclBindingFilter> filters = new ArrayList<>();
+        for (int i = 0; i < MAX_RECORDS_PER_USER_OP + 2; i++) {
+            filters.add(new AclBindingFilter(
+                new ResourcePatternFilter(ResourceType.TOPIC, "mytopic_" + i, 
PatternType.LITERAL),
+                AccessControlEntryFilter.ANY));
+        }
+
+        Exception exception = assertThrows(InvalidRequestException.class, () 
-> manager.deleteAcls(filters));
+        assertEquals(BoundedListTooLongException.class, 
exception.getCause().getClass());
+        assertEquals("Cannot remove more than " + MAX_RECORDS_PER_USER_OP + " 
acls in a single delete operation.", exception.getCause().getMessage());
+    }
 }

Reply via email to