This is an automated email from the ASF dual-hosted git repository.
chia7712 pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 583acb60d68 MINOR: Restore original behavior of
GroupAuthorizerIntegrationTest (#19968)
583acb60d68 is described below
commit 583acb60d68795f5e55d7ac5ac3b0f0b1508a611
Author: Nick Guo <[email protected]>
AuthorDate: Sun Jun 22 22:08:20 2025 +0800
MINOR: Restore original behavior of GroupAuthorizerIntegrationTest (#19968)
this is a follow-up for https://github.com/apache/kafka/pull/19685
The timeout issue in `AsyncConsumer#unsubscribe` was fixed by
https://github.com/apache/kafka/pull/19779. As a result, the test
`GroupAuthorizerIntegrationTest#testConsumeUnsubscribeWithoutGroupPermission`
should now retain its original behavior as expected prior to the issue.
Reviewers: Chia-Ping Tsai <[email protected]>
---
.../security/GroupAuthorizerIntegrationTest.java | 50 ++++++++++++++++++++--
1 file changed, 46 insertions(+), 4 deletions(-)
diff --git
a/clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/security/GroupAuthorizerIntegrationTest.java
b/clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/security/GroupAuthorizerIntegrationTest.java
index 725c0f53786..a1eb4c4c027 100644
---
a/clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/security/GroupAuthorizerIntegrationTest.java
+++
b/clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/security/GroupAuthorizerIntegrationTest.java
@@ -141,7 +141,9 @@ public class GroupAuthorizerIntegrationTest {
}
private void addAndVerifyAcls(Set<AccessControlEntry> acls,
ResourcePattern resource, ClusterInstance clusterInstance) throws
InterruptedException {
- List<AclBinding> aclBindings = acls.stream().map(acl -> new
AclBinding(resource, acl)).toList();
+ List<AclBinding> aclBindings = acls.stream()
+ .map(acl -> new AclBinding(resource, acl))
+ .toList();
Authorizer authorizer = getAuthorizer(clusterInstance);
authorizer.createAcls(ANONYMOUS_CONTEXT, aclBindings)
.forEach(future -> {
@@ -155,6 +157,29 @@ public class GroupAuthorizerIntegrationTest {
clusterInstance.waitAcls(aclBindingFilter, acls);
}
+ private void removeAndVerifyAcls(Set<AccessControlEntry> deleteAcls,
ResourcePattern resource, ClusterInstance clusterInstance) throws
InterruptedException {
+ List<AclBindingFilter> aclBindingFilters = deleteAcls.stream()
+ .map(acl -> new AclBindingFilter(resource.toFilter(),
acl.toFilter()))
+ .toList();
+ Authorizer authorizer = getAuthorizer(clusterInstance);
+ authorizer.deleteAcls(ANONYMOUS_CONTEXT, aclBindingFilters)
+ .forEach(future -> {
+ try {
+ future.toCompletableFuture().get();
+ } catch (InterruptedException | ExecutionException e) {
+ throw new RuntimeException("Failed to delete ACLs", e);
+ }
+ });
+
+ AclBindingFilter aclBindingFilter = new
AclBindingFilter(resource.toFilter(), AccessControlEntryFilter.ANY);
+ TestUtils.waitForCondition(() -> {
+ Set<AccessControlEntry> remainingAclEntries = new HashSet<>();
+ authorizer.acls(aclBindingFilter).forEach(aclBinding ->
remainingAclEntries.add(aclBinding.entry()));
+ return
deleteAcls.stream().noneMatch(remainingAclEntries::contains);
+ }, "Failed to verify ACLs deletion");
+ }
+
+
static final AuthorizableRequestContext ANONYMOUS_CONTEXT = new
AuthorizableRequestContext() {
@Override
public String listenerName() {
@@ -253,17 +278,27 @@ public class GroupAuthorizerIntegrationTest {
}
}
+ @ClusterTest
+ public void
testClassicConsumeUnsubscribeWithGroupPermission(ClusterInstance
clusterInstance) throws ExecutionException, InterruptedException {
+ testConsumeUnsubscribeWithOrWithoutGroupPermission(clusterInstance,
GroupProtocol.CLASSIC, true);
+ }
+
+ @ClusterTest
+ public void testAsyncConsumeUnsubscribeWithGroupPermission(ClusterInstance
clusterInstance) throws ExecutionException, InterruptedException {
+ testConsumeUnsubscribeWithOrWithoutGroupPermission(clusterInstance,
GroupProtocol.CONSUMER, true);
+ }
+
@ClusterTest
public void
testClassicConsumeUnsubscribeWithoutGroupPermission(ClusterInstance
clusterInstance) throws ExecutionException, InterruptedException {
- testConsumeUnsubscribeWithGroupPermission(clusterInstance,
GroupProtocol.CLASSIC);
+ testConsumeUnsubscribeWithOrWithoutGroupPermission(clusterInstance,
GroupProtocol.CLASSIC, false);
}
@ClusterTest
public void
testAsyncConsumeUnsubscribeWithoutGroupPermission(ClusterInstance
clusterInstance) throws ExecutionException, InterruptedException {
- testConsumeUnsubscribeWithGroupPermission(clusterInstance,
GroupProtocol.CONSUMER);
+ testConsumeUnsubscribeWithOrWithoutGroupPermission(clusterInstance,
GroupProtocol.CONSUMER, false);
}
- private void testConsumeUnsubscribeWithGroupPermission(ClusterInstance
clusterInstance, GroupProtocol groupProtocol) throws InterruptedException,
ExecutionException {
+ private void
testConsumeUnsubscribeWithOrWithoutGroupPermission(ClusterInstance
clusterInstance, GroupProtocol groupProtocol, boolean withGroupPermission)
throws InterruptedException, ExecutionException {
setup(clusterInstance);
String topic = "topic";
String group = "group";
@@ -297,6 +332,13 @@ public class GroupAuthorizerIntegrationTest {
ConsumerRecords<byte[], byte[]> records =
consumer.poll(Duration.ofSeconds(15));
return records.count() == 1;
}, "consumer failed to receive message");
+ if (!withGroupPermission) {
+ removeAndVerifyAcls(
+ Set.of(createAcl(AclOperation.READ,
AclPermissionType.ALLOW, CLIENT_PRINCIPAL)),
+ new ResourcePattern(ResourceType.GROUP, group,
PatternType.LITERAL),
+ clusterInstance
+ );
+ }
assertDoesNotThrow(consumer::unsubscribe);
}
}