This is an automated email from the ASF dual-hosted git repository.

chia7712 pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 583acb60d68 MINOR: Restore original behavior of 
GroupAuthorizerIntegrationTest (#19968)
583acb60d68 is described below

commit 583acb60d68795f5e55d7ac5ac3b0f0b1508a611
Author: Nick Guo <[email protected]>
AuthorDate: Sun Jun 22 22:08:20 2025 +0800

    MINOR: Restore original behavior of GroupAuthorizerIntegrationTest (#19968)
    
    this is a follow-up for https://github.com/apache/kafka/pull/19685
    
    The timeout issue in `AsyncConsumer#unsubscribe` was fixed by
    https://github.com/apache/kafka/pull/19779. As a result, the test
    
    
`GroupAuthorizerIntegrationTest#testConsumeUnsubscribeWithoutGroupPermission`
    should now retain its original behavior as expected prior to the issue.
    
    Reviewers: Chia-Ping Tsai <[email protected]>
---
 .../security/GroupAuthorizerIntegrationTest.java   | 50 ++++++++++++++++++++--
 1 file changed, 46 insertions(+), 4 deletions(-)

diff --git 
a/clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/security/GroupAuthorizerIntegrationTest.java
 
b/clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/security/GroupAuthorizerIntegrationTest.java
index 725c0f53786..a1eb4c4c027 100644
--- 
a/clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/security/GroupAuthorizerIntegrationTest.java
+++ 
b/clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/security/GroupAuthorizerIntegrationTest.java
@@ -141,7 +141,9 @@ public class GroupAuthorizerIntegrationTest {
     }
 
     private void addAndVerifyAcls(Set<AccessControlEntry> acls, 
ResourcePattern resource, ClusterInstance clusterInstance) throws 
InterruptedException {
-        List<AclBinding> aclBindings = acls.stream().map(acl -> new 
AclBinding(resource, acl)).toList();
+        List<AclBinding> aclBindings = acls.stream()
+                .map(acl -> new AclBinding(resource, acl))
+                .toList();
         Authorizer authorizer = getAuthorizer(clusterInstance);
         authorizer.createAcls(ANONYMOUS_CONTEXT, aclBindings)
                 .forEach(future -> {
@@ -155,6 +157,29 @@ public class GroupAuthorizerIntegrationTest {
         clusterInstance.waitAcls(aclBindingFilter, acls);
     }
 
+    private void removeAndVerifyAcls(Set<AccessControlEntry> deleteAcls, 
ResourcePattern resource, ClusterInstance clusterInstance) throws 
InterruptedException {
+        List<AclBindingFilter> aclBindingFilters = deleteAcls.stream()
+                .map(acl -> new AclBindingFilter(resource.toFilter(), 
acl.toFilter()))
+                .toList();
+        Authorizer authorizer = getAuthorizer(clusterInstance);
+        authorizer.deleteAcls(ANONYMOUS_CONTEXT, aclBindingFilters)
+                .forEach(future -> {
+                    try {
+                        future.toCompletableFuture().get();
+                    } catch (InterruptedException | ExecutionException e) {
+                        throw new RuntimeException("Failed to delete ACLs", e);
+                    }
+                });
+
+        AclBindingFilter aclBindingFilter = new 
AclBindingFilter(resource.toFilter(), AccessControlEntryFilter.ANY);
+        TestUtils.waitForCondition(() -> {
+            Set<AccessControlEntry> remainingAclEntries = new HashSet<>();
+            authorizer.acls(aclBindingFilter).forEach(aclBinding -> 
remainingAclEntries.add(aclBinding.entry()));
+            return 
deleteAcls.stream().noneMatch(remainingAclEntries::contains);
+        }, "Failed to verify ACLs deletion");
+    }
+
+
     static final AuthorizableRequestContext ANONYMOUS_CONTEXT = new 
AuthorizableRequestContext() {
         @Override
         public String listenerName() {
@@ -253,17 +278,27 @@ public class GroupAuthorizerIntegrationTest {
         }
     }
 
+    @ClusterTest
+    public void 
testClassicConsumeUnsubscribeWithGroupPermission(ClusterInstance 
clusterInstance) throws ExecutionException, InterruptedException {
+        testConsumeUnsubscribeWithOrWithoutGroupPermission(clusterInstance, 
GroupProtocol.CLASSIC, true);
+    }
+
+    @ClusterTest
+    public void testAsyncConsumeUnsubscribeWithGroupPermission(ClusterInstance 
clusterInstance) throws ExecutionException, InterruptedException {
+        testConsumeUnsubscribeWithOrWithoutGroupPermission(clusterInstance, 
GroupProtocol.CONSUMER, true);
+    }
+
     @ClusterTest
     public void 
testClassicConsumeUnsubscribeWithoutGroupPermission(ClusterInstance 
clusterInstance) throws ExecutionException, InterruptedException {
-        testConsumeUnsubscribeWithGroupPermission(clusterInstance, 
GroupProtocol.CLASSIC);
+        testConsumeUnsubscribeWithOrWithoutGroupPermission(clusterInstance, 
GroupProtocol.CLASSIC, false);
     }
 
     @ClusterTest
     public void 
testAsyncConsumeUnsubscribeWithoutGroupPermission(ClusterInstance 
clusterInstance) throws ExecutionException, InterruptedException {
-        testConsumeUnsubscribeWithGroupPermission(clusterInstance, 
GroupProtocol.CONSUMER);
+        testConsumeUnsubscribeWithOrWithoutGroupPermission(clusterInstance, 
GroupProtocol.CONSUMER, false);
     }
 
-    private void testConsumeUnsubscribeWithGroupPermission(ClusterInstance 
clusterInstance, GroupProtocol groupProtocol) throws InterruptedException, 
ExecutionException {
+    private void 
testConsumeUnsubscribeWithOrWithoutGroupPermission(ClusterInstance 
clusterInstance, GroupProtocol groupProtocol, boolean withGroupPermission) 
throws InterruptedException, ExecutionException {
         setup(clusterInstance);
         String topic = "topic";
         String group = "group";
@@ -297,6 +332,13 @@ public class GroupAuthorizerIntegrationTest {
                 ConsumerRecords<byte[], byte[]> records = 
consumer.poll(Duration.ofSeconds(15));
                 return records.count() == 1;
             }, "consumer failed to receive message");
+            if (!withGroupPermission) {
+                removeAndVerifyAcls(
+                        Set.of(createAcl(AclOperation.READ, 
AclPermissionType.ALLOW, CLIENT_PRINCIPAL)),
+                        new ResourcePattern(ResourceType.GROUP, group, 
PatternType.LITERAL),
+                        clusterInstance
+                );
+            }
             assertDoesNotThrow(consumer::unsubscribe);
         }
     }

Reply via email to