This is an automated email from the ASF dual-hosted git repository.
chia7712 pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new cc0f06554ba KAFKA-19042 Move GroupAuthorizerIntegrationTest to
clients-integration-tests module (#19685)
cc0f06554ba is described below
commit cc0f06554bacc9fe086c4f4c5ecee1b5dc75ae9b
Author: Nick Guo <[email protected]>
AuthorDate: Sat May 31 02:34:56 2025 +0800
KAFKA-19042 Move GroupAuthorizerIntegrationTest to
clients-integration-tests module (#19685)
move GroupAuthorizerIntegrationTest to clients-integration-tests module
Reviewers: Ken Huang <[email protected]>, PoAn Yang
<[email protected]>, keemsisi <[email protected]>, Chia-Ping Tsai
<[email protected]>
---
.../security/GroupAuthorizerIntegrationTest.java | 402 +++++++++++++++++++++
.../kafka/api/GroupAuthorizerIntegrationTest.scala | 239 ------------
2 files changed, 402 insertions(+), 239 deletions(-)
diff --git
a/clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/security/GroupAuthorizerIntegrationTest.java
b/clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/security/GroupAuthorizerIntegrationTest.java
new file mode 100644
index 00000000000..725c0f53786
--- /dev/null
+++
b/clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/security/GroupAuthorizerIntegrationTest.java
@@ -0,0 +1,402 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.security;
+
+import org.apache.kafka.clients.admin.Admin;
+import org.apache.kafka.clients.admin.AdminClientConfig;
+import org.apache.kafka.clients.admin.NewTopic;
+import org.apache.kafka.clients.consumer.Consumer;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.clients.consumer.GroupProtocol;
+import org.apache.kafka.clients.producer.Producer;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.acl.AccessControlEntry;
+import org.apache.kafka.common.acl.AccessControlEntryFilter;
+import org.apache.kafka.common.acl.AclBinding;
+import org.apache.kafka.common.acl.AclBindingFilter;
+import org.apache.kafka.common.acl.AclOperation;
+import org.apache.kafka.common.acl.AclPermissionType;
+import org.apache.kafka.common.config.internals.BrokerSecurityConfigs;
+import org.apache.kafka.common.errors.TopicAuthorizationException;
+import org.apache.kafka.common.internals.Topic;
+import org.apache.kafka.common.resource.PatternType;
+import org.apache.kafka.common.resource.Resource;
+import org.apache.kafka.common.resource.ResourcePattern;
+import org.apache.kafka.common.resource.ResourceType;
+import org.apache.kafka.common.security.auth.AuthenticationContext;
+import org.apache.kafka.common.security.auth.KafkaPrincipal;
+import org.apache.kafka.common.security.auth.SecurityProtocol;
+import
org.apache.kafka.common.security.authenticator.DefaultKafkaPrincipalBuilder;
+import org.apache.kafka.common.test.ClusterInstance;
+import org.apache.kafka.common.test.TestUtils;
+import org.apache.kafka.common.test.api.ClusterConfigProperty;
+import org.apache.kafka.common.test.api.ClusterTest;
+import org.apache.kafka.common.test.api.ClusterTestDefaults;
+import org.apache.kafka.coordinator.group.GroupCoordinatorConfig;
+import org.apache.kafka.metadata.authorizer.StandardAuthorizer;
+import org.apache.kafka.server.authorizer.AuthorizableRequestContext;
+import org.apache.kafka.server.authorizer.Authorizer;
+import org.apache.kafka.server.config.ServerConfigs;
+
+import java.net.InetAddress;
+import java.time.Duration;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Locale;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ExecutionException;
+
+import static
org.apache.kafka.clients.consumer.ConsumerConfig.GROUP_PROTOCOL_CONFIG;
+import static org.apache.kafka.security.authorizer.AclEntry.WILDCARD_HOST;
+import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertInstanceOf;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+
+
+@ClusterTestDefaults(serverProperties = {
+ @ClusterConfigProperty(key = StandardAuthorizer.SUPER_USERS_CONFIG, value
= "Group:broker"),
+ @ClusterConfigProperty(key =
GroupCoordinatorConfig.OFFSETS_TOPIC_PARTITIONS_CONFIG, value = "1"),
+ @ClusterConfigProperty(key =
GroupCoordinatorConfig.OFFSETS_TOPIC_REPLICATION_FACTOR_CONFIG, value = "1"),
+ @ClusterConfigProperty(key = ServerConfigs.AUTHORIZER_CLASS_NAME_CONFIG,
value = "org.apache.kafka.metadata.authorizer.StandardAuthorizer"),
+ @ClusterConfigProperty(key =
BrokerSecurityConfigs.PRINCIPAL_BUILDER_CLASS_CONFIG, value =
"org.apache.kafka.clients.security.GroupAuthorizerIntegrationTest$GroupPrincipalBuilder"),
+})
+public class GroupAuthorizerIntegrationTest {
+ private static final KafkaPrincipal BROKER_PRINCIPAL = new
KafkaPrincipal("Group", "broker");
+ private static final KafkaPrincipal CLIENT_PRINCIPAL = new
KafkaPrincipal("Group", "client");
+
+ private static final String BROKER_LISTENER_NAME = "BROKER";
+ private static final String CLIENT_LISTENER_NAME = "EXTERNAL";
+ private static final String CONTROLLER_LISTENER_NAME = "CONTROLLER";
+
+ private Authorizer getAuthorizer(ClusterInstance clusterInstance) {
+ return clusterInstance.controllers().values().stream()
+ .filter(server -> server.authorizerPlugin().isDefined())
+ .map(server ->
server.authorizerPlugin().get().get()).findFirst().get();
+ }
+
+ private void setup(ClusterInstance clusterInstance) throws
InterruptedException {
+ // Allow inter-broker communication
+ addAndVerifyAcls(
+ Set.of(createAcl(AclOperation.CLUSTER_ACTION,
AclPermissionType.ALLOW, BROKER_PRINCIPAL)),
+ new ResourcePattern(ResourceType.CLUSTER,
Resource.CLUSTER_NAME, PatternType.LITERAL),
+ clusterInstance
+ );
+ addAndVerifyAcls(
+ Set.of(createAcl(AclOperation.CREATE, AclPermissionType.ALLOW,
CLIENT_PRINCIPAL)),
+ new ResourcePattern(ResourceType.TOPIC,
Topic.GROUP_METADATA_TOPIC_NAME, PatternType.LITERAL),
+ clusterInstance
+ );
+
+ NewTopic offsetTopic = new NewTopic(Topic.GROUP_METADATA_TOPIC_NAME,
1, (short) 1);
+ try (Admin admin = clusterInstance.admin(Map.of(
+ AdminClientConfig.ENABLE_METRICS_PUSH_CONFIG, true))
+ ) {
+ admin.createTopics(Collections.singleton(offsetTopic));
+ clusterInstance.waitForTopic(Topic.GROUP_METADATA_TOPIC_NAME, 1);
+ }
+ }
+
+ public static class GroupPrincipalBuilder extends
DefaultKafkaPrincipalBuilder {
+ public GroupPrincipalBuilder() {
+ super(null, null);
+ }
+
+ @Override
+ public KafkaPrincipal build(AuthenticationContext context) {
+ String listenerName = context.listenerName();
+ return switch (listenerName) {
+ case BROKER_LISTENER_NAME, CONTROLLER_LISTENER_NAME ->
BROKER_PRINCIPAL;
+ case CLIENT_LISTENER_NAME -> CLIENT_PRINCIPAL;
+ default -> throw new IllegalArgumentException("No principal
mapped to listener " + listenerName);
+ };
+ }
+ }
+
+ private AccessControlEntry createAcl(AclOperation aclOperation,
AclPermissionType aclPermissionType, KafkaPrincipal principal) {
+ return new AccessControlEntry(
+ principal.toString(),
+ WILDCARD_HOST,
+ aclOperation,
+ aclPermissionType
+ );
+ }
+
+ private void addAndVerifyAcls(Set<AccessControlEntry> acls,
ResourcePattern resource, ClusterInstance clusterInstance) throws
InterruptedException {
+ List<AclBinding> aclBindings = acls.stream().map(acl -> new
AclBinding(resource, acl)).toList();
+ Authorizer authorizer = getAuthorizer(clusterInstance);
+ authorizer.createAcls(ANONYMOUS_CONTEXT, aclBindings)
+ .forEach(future -> {
+ try {
+ future.toCompletableFuture().get();
+ } catch (InterruptedException | ExecutionException e) {
+ throw new RuntimeException("Failed to create ACLs", e);
+ }
+ });
+ AclBindingFilter aclBindingFilter = new
AclBindingFilter(resource.toFilter(), AccessControlEntryFilter.ANY);
+ clusterInstance.waitAcls(aclBindingFilter, acls);
+ }
+
+ static final AuthorizableRequestContext ANONYMOUS_CONTEXT = new
AuthorizableRequestContext() {
+ @Override
+ public String listenerName() {
+ return "";
+ }
+
+ @Override
+ public SecurityProtocol securityProtocol() {
+ return SecurityProtocol.PLAINTEXT;
+ }
+
+ @Override
+ public KafkaPrincipal principal() {
+ return KafkaPrincipal.ANONYMOUS;
+ }
+
+ @Override
+ public InetAddress clientAddress() {
+ return null;
+ }
+
+ @Override
+ public int requestType() {
+ return 0;
+ }
+
+ @Override
+ public int requestVersion() {
+ return 0;
+ }
+
+ @Override
+ public String clientId() {
+ return "";
+ }
+
+ @Override
+ public int correlationId() {
+ return 0;
+ }
+ };
+
+ @ClusterTest
+ public void
testUnauthorizedProduceAndConsumeWithClassicConsumer(ClusterInstance
clusterInstance) throws InterruptedException {
+ testUnauthorizedProduceAndConsume(clusterInstance,
GroupProtocol.CLASSIC);
+ }
+
+ @ClusterTest
+ public void
testUnauthorizedProduceAndConsumeWithAsyncConsumer(ClusterInstance
clusterInstance) throws InterruptedException {
+ testUnauthorizedProduceAndConsume(clusterInstance,
GroupProtocol.CONSUMER);
+ }
+
+ public void testUnauthorizedProduceAndConsume(ClusterInstance
clusterInstance, GroupProtocol groupProtocol) throws InterruptedException {
+ setup(clusterInstance);
+ String topic = "topic";
+ String group = "group";
+
+ addAndVerifyAcls(
+ Set.of(createAcl(AclOperation.CREATE, AclPermissionType.ALLOW,
CLIENT_PRINCIPAL)),
+ new ResourcePattern(ResourceType.TOPIC, topic,
PatternType.LITERAL),
+ clusterInstance
+ );
+ addAndVerifyAcls(
+ Set.of(createAcl(AclOperation.READ, AclPermissionType.ALLOW,
CLIENT_PRINCIPAL)),
+ new ResourcePattern(ResourceType.GROUP, group,
PatternType.LITERAL),
+ clusterInstance
+ );
+
+ Producer<byte[], byte[]> producer = clusterInstance.producer();
+ Consumer<byte[], byte[]> consumer = clusterInstance.consumer(Map.of(
+ GROUP_PROTOCOL_CONFIG,
groupProtocol.name.toLowerCase(Locale.ROOT),
+ ConsumerConfig.GROUP_ID_CONFIG, group
+ ));
+
+ try {
+ clusterInstance.createTopic(topic, 1, (short) 1);
+ ExecutionException produceException = assertThrows(
+ ExecutionException.class,
+ () -> producer.send(new ProducerRecord<>(topic,
"message".getBytes())).get()
+ );
+ Throwable cause = produceException.getCause();
+ assertInstanceOf(TopicAuthorizationException.class, cause);
+ TopicAuthorizationException topicAuthException =
(TopicAuthorizationException) cause;
+ assertEquals(Set.of(topic),
topicAuthException.unauthorizedTopics());
+
+ TopicPartition topicPartition = new TopicPartition(topic, 0);
+ consumer.assign(Collections.singletonList(topicPartition));
+ TopicAuthorizationException consumeException = assertThrows(
+ TopicAuthorizationException.class,
+ () -> consumer.poll(Duration.ofSeconds(15))
+ );
+ assertEquals(consumeException.unauthorizedTopics(),
topicAuthException.unauthorizedTopics());
+ } finally {
+ producer.close(Duration.ZERO);
+ consumer.close();
+ }
+ }
+
+ @ClusterTest
+ public void
testClassicConsumeUnsubscribeWithoutGroupPermission(ClusterInstance
clusterInstance) throws ExecutionException, InterruptedException {
+ testConsumeUnsubscribeWithGroupPermission(clusterInstance,
GroupProtocol.CLASSIC);
+ }
+
+ @ClusterTest
+ public void
testAsyncConsumeUnsubscribeWithoutGroupPermission(ClusterInstance
clusterInstance) throws ExecutionException, InterruptedException {
+ testConsumeUnsubscribeWithGroupPermission(clusterInstance,
GroupProtocol.CONSUMER);
+ }
+
+ private void testConsumeUnsubscribeWithGroupPermission(ClusterInstance
clusterInstance, GroupProtocol groupProtocol) throws InterruptedException,
ExecutionException {
+ setup(clusterInstance);
+ String topic = "topic";
+ String group = "group";
+
+ // allow topic read/write permission to poll/send record
+ Set<AccessControlEntry> acls = new HashSet<>();
+ acls.add(createAcl(AclOperation.CREATE, AclPermissionType.ALLOW,
CLIENT_PRINCIPAL));
+ acls.add(createAcl(AclOperation.WRITE, AclPermissionType.ALLOW,
CLIENT_PRINCIPAL));
+ acls.add(createAcl(AclOperation.READ, AclPermissionType.ALLOW,
CLIENT_PRINCIPAL));
+ addAndVerifyAcls(
+ acls,
+ new ResourcePattern(ResourceType.TOPIC, topic,
PatternType.LITERAL),
+ clusterInstance
+ );
+ addAndVerifyAcls(
+ Set.of(createAcl(AclOperation.READ, AclPermissionType.ALLOW,
CLIENT_PRINCIPAL)),
+ new ResourcePattern(ResourceType.GROUP, group,
PatternType.LITERAL),
+ clusterInstance
+ );
+
+ try (Producer<byte[], byte[]> producer = clusterInstance.producer();
+ Consumer<byte[], byte[]> consumer =
clusterInstance.consumer(Map.of(
+ ConsumerConfig.GROUP_ID_CONFIG, group,
+ ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false",
+ GROUP_PROTOCOL_CONFIG,
groupProtocol.name.toLowerCase(Locale.ROOT)))
+ ) {
+ clusterInstance.createTopic(topic, 1, (short) 1);
+ producer.send(new ProducerRecord<>(topic,
"message".getBytes())).get();
+ consumer.subscribe(Collections.singletonList(topic));
+ TestUtils.waitForCondition(() -> {
+ ConsumerRecords<byte[], byte[]> records =
consumer.poll(Duration.ofSeconds(15));
+ return records.count() == 1;
+ }, "consumer failed to receive message");
+ assertDoesNotThrow(consumer::unsubscribe);
+ }
+ }
+
+ @ClusterTest
+ public void testClassicConsumeCloseWithGroupPermission(ClusterInstance
clusterInstance) throws ExecutionException, InterruptedException {
+ testConsumeCloseWithGroupPermission(clusterInstance,
GroupProtocol.CLASSIC);
+ }
+
+ @ClusterTest
+ public void testAsyncConsumeCloseWithGroupPermission(ClusterInstance
clusterInstance) throws ExecutionException, InterruptedException {
+ testConsumeCloseWithGroupPermission(clusterInstance,
GroupProtocol.CONSUMER);
+ }
+
+ private void testConsumeCloseWithGroupPermission(ClusterInstance
clusterInstance, GroupProtocol groupProtocol) throws InterruptedException,
ExecutionException {
+ setup(clusterInstance);
+ String topic = "topic";
+ String group = "group";
+
+ // allow topic read/write permission to poll/send record
+ Set<AccessControlEntry> acls = new HashSet<>();
+ acls.add(createAcl(AclOperation.CREATE, AclPermissionType.ALLOW,
CLIENT_PRINCIPAL));
+ acls.add(createAcl(AclOperation.WRITE, AclPermissionType.ALLOW,
CLIENT_PRINCIPAL));
+ acls.add(createAcl(AclOperation.READ, AclPermissionType.ALLOW,
CLIENT_PRINCIPAL));
+ addAndVerifyAcls(
+ acls,
+ new ResourcePattern(ResourceType.TOPIC, topic,
PatternType.LITERAL),
+ clusterInstance
+ );
+ addAndVerifyAcls(
+ Set.of(createAcl(AclOperation.READ, AclPermissionType.ALLOW,
CLIENT_PRINCIPAL)),
+ new ResourcePattern(ResourceType.GROUP, group,
PatternType.LITERAL),
+ clusterInstance
+ );
+
+ Producer<Object, Object> producer = clusterInstance.producer();
+ Consumer<byte[], byte[]> consumer = clusterInstance.consumer(Map.of(
+ ConsumerConfig.GROUP_ID_CONFIG, group,
+ ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false",
+ GROUP_PROTOCOL_CONFIG,
groupProtocol.name.toLowerCase(Locale.ROOT)));
+
+ try {
+ clusterInstance.createTopic(topic, 1, (short) 1);
+ producer.send(new ProducerRecord<>(topic,
"message".getBytes())).get();
+ consumer.subscribe(List.of(topic));
+ TestUtils.waitForCondition(() -> {
+ ConsumerRecords<byte[], byte[]> records =
consumer.poll(Duration.ofSeconds(15));
+ return records.count() == 1;
+ }, "consumer failed to receive message");
+ } finally {
+ producer.close();
+ assertDoesNotThrow(() -> consumer.close());
+ }
+ }
+
+ @ClusterTest
+ public void testAuthorizedProduceAndConsumeWithClassic(ClusterInstance
clusterInstance) throws ExecutionException, InterruptedException {
+ testAuthorizedProduceAndConsume(clusterInstance,
GroupProtocol.CLASSIC);
+ }
+
+ @ClusterTest
+ public void testAuthorizedProduceAndConsumeWithAsync(ClusterInstance
clusterInstance) throws ExecutionException, InterruptedException {
+ testAuthorizedProduceAndConsume(clusterInstance,
GroupProtocol.CONSUMER);
+ }
+
+ private void testAuthorizedProduceAndConsume(ClusterInstance
clusterInstance, GroupProtocol groupProtocol) throws InterruptedException,
ExecutionException {
+ setup(clusterInstance);
+ String topic = "topic";
+ String group = "group";
+
+ Set<AccessControlEntry> acls = new HashSet<>();
+ acls.add(createAcl(AclOperation.CREATE, AclPermissionType.ALLOW,
CLIENT_PRINCIPAL));
+ acls.add(createAcl(AclOperation.WRITE, AclPermissionType.ALLOW,
CLIENT_PRINCIPAL));
+ acls.add(createAcl(AclOperation.READ, AclPermissionType.ALLOW,
CLIENT_PRINCIPAL));
+ addAndVerifyAcls(
+ acls,
+ new ResourcePattern(ResourceType.TOPIC, topic,
PatternType.LITERAL),
+ clusterInstance
+ );
+ addAndVerifyAcls(
+ Set.of(createAcl(AclOperation.READ, AclPermissionType.ALLOW,
CLIENT_PRINCIPAL)),
+ new ResourcePattern(ResourceType.GROUP, group,
PatternType.LITERAL),
+ clusterInstance
+ );
+
+ try (Producer<byte[], byte[]> producer = clusterInstance.producer();
+ Consumer<byte[], byte[]> consumer =
clusterInstance.consumer(Map.of(
+ ConsumerConfig.GROUP_ID_CONFIG, group,
+ ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false",
+ GROUP_PROTOCOL_CONFIG,
groupProtocol.name.toLowerCase(Locale.ROOT)))
+ ) {
+ clusterInstance.createTopic(topic, 1, (short) 1);
+ producer.send(new ProducerRecord<>(topic,
"message".getBytes())).get();
+ TopicPartition topicPartition = new TopicPartition(topic, 0);
+ consumer.assign(List.of(topicPartition));
+ TestUtils.waitForCondition(() -> {
+ ConsumerRecords<byte[], byte[]> records =
consumer.poll(Duration.ofSeconds(15));
+ return records.count() == 1;
+ }, "consumer failed to receive message");
+ }
+ }
+
+}
diff --git
a/core/src/test/scala/integration/kafka/api/GroupAuthorizerIntegrationTest.scala
b/core/src/test/scala/integration/kafka/api/GroupAuthorizerIntegrationTest.scala
deleted file mode 100644
index 01d18114a04..00000000000
---
a/core/src/test/scala/integration/kafka/api/GroupAuthorizerIntegrationTest.scala
+++ /dev/null
@@ -1,239 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE
- * file distributed with this work for additional information regarding
copyright ownership. The ASF licenses this file
- * to You under the Apache License, Version 2.0 (the "License"); you may not
use this file except in compliance with the
- * License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on
- * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
express or implied. See the License for the
- * specific language governing permissions and limitations under the License.
- */
-package kafka.api
-
-import java.util.Properties
-import java.util.concurrent.ExecutionException
-import kafka.api.GroupAuthorizerIntegrationTest._
-import kafka.server.BaseRequestTest
-import kafka.utils.{TestInfoUtils, TestUtils}
-import org.apache.kafka.clients.consumer.ConsumerConfig
-import org.apache.kafka.clients.producer.ProducerRecord
-import org.apache.kafka.common.TopicPartition
-import org.apache.kafka.common.acl.{AccessControlEntry, AclOperation,
AclPermissionType}
-import org.apache.kafka.common.config.internals.BrokerSecurityConfigs
-import org.apache.kafka.common.errors.TopicAuthorizationException
-import org.apache.kafka.common.network.ListenerName
-import org.apache.kafka.common.resource.{PatternType, Resource,
ResourcePattern, ResourceType}
-import org.apache.kafka.common.security.auth.{AuthenticationContext,
KafkaPrincipal}
-import
org.apache.kafka.common.security.authenticator.DefaultKafkaPrincipalBuilder
-import org.apache.kafka.coordinator.group.GroupCoordinatorConfig
-import org.apache.kafka.coordinator.transaction.TransactionLogConfig
-import org.apache.kafka.metadata.authorizer.StandardAuthorizer
-import org.apache.kafka.security.authorizer.AclEntry.WILDCARD_HOST
-import org.apache.kafka.server.config.ServerConfigs
-import org.junit.jupiter.api.Assertions._
-import org.junit.jupiter.api.function.Executable
-import org.junit.jupiter.api.{BeforeEach, TestInfo, Timeout}
-import org.junit.jupiter.params.ParameterizedTest
-import org.junit.jupiter.params.provider.MethodSource
-
-import scala.jdk.CollectionConverters._
-
-object GroupAuthorizerIntegrationTest {
- val BrokerPrincipal = new KafkaPrincipal("Group", "broker")
- val ClientPrincipal = new KafkaPrincipal("Group", "client")
-
- val BrokerListenerName = "BROKER"
- val ClientListenerName = "CLIENT"
- val ControllerListenerName = "CONTROLLER"
-
- class GroupPrincipalBuilder extends DefaultKafkaPrincipalBuilder(null, null)
{
- override def build(context: AuthenticationContext): KafkaPrincipal = {
- context.listenerName match {
- case BrokerListenerName | ControllerListenerName => BrokerPrincipal
- case ClientListenerName => ClientPrincipal
- case listenerName => throw new IllegalArgumentException(s"No principal
mapped to listener $listenerName")
- }
- }
- }
-}
-
-class GroupAuthorizerIntegrationTest extends BaseRequestTest {
-
- val brokerId: Integer = 0
-
- override def brokerCount: Int = 1
- override def interBrokerListenerName: ListenerName = new
ListenerName(BrokerListenerName)
- override def listenerName: ListenerName = new
ListenerName(ClientListenerName)
-
- def brokerPrincipal: KafkaPrincipal = BrokerPrincipal
- def clientPrincipal: KafkaPrincipal = ClientPrincipal
-
- override def kraftControllerConfigs(testInfo: TestInfo):
collection.Seq[Properties] = {
- val controllerConfigs = super.kraftControllerConfigs(testInfo)
- controllerConfigs.foreach(addNodeProperties)
- controllerConfigs
- }
-
- override def brokerPropertyOverrides(properties: Properties): Unit = {
- properties.put(ServerConfigs.BROKER_ID_CONFIG, brokerId.toString)
- addNodeProperties(properties)
- }
-
- private def addNodeProperties(properties: Properties): Unit = {
- properties.put(ServerConfigs.AUTHORIZER_CLASS_NAME_CONFIG,
classOf[StandardAuthorizer].getName)
- properties.put(StandardAuthorizer.SUPER_USERS_CONFIG,
BrokerPrincipal.toString)
-
- properties.put(GroupCoordinatorConfig.OFFSETS_TOPIC_PARTITIONS_CONFIG, "1")
-
properties.put(GroupCoordinatorConfig.OFFSETS_TOPIC_REPLICATION_FACTOR_CONFIG,
"1")
- properties.put(TransactionLogConfig.TRANSACTIONS_TOPIC_PARTITIONS_CONFIG,
"1")
-
properties.put(TransactionLogConfig.TRANSACTIONS_TOPIC_REPLICATION_FACTOR_CONFIG,
"1")
- properties.put(TransactionLogConfig.TRANSACTIONS_TOPIC_MIN_ISR_CONFIG, "1")
- properties.put(BrokerSecurityConfigs.PRINCIPAL_BUILDER_CLASS_CONFIG,
classOf[GroupPrincipalBuilder].getName)
- }
-
- @BeforeEach
- override def setUp(testInfo: TestInfo): Unit = {
- doSetup(testInfo, createOffsetsTopic = false)
-
- // Allow inter-broker communication
- addAndVerifyAcls(
- Set(createAcl(AclOperation.CLUSTER_ACTION, AclPermissionType.ALLOW,
principal = BrokerPrincipal)),
- new ResourcePattern(ResourceType.CLUSTER, Resource.CLUSTER_NAME,
PatternType.LITERAL)
- )
-
- createOffsetsTopic(interBrokerListenerName)
- }
-
- private def createAcl(aclOperation: AclOperation,
- aclPermissionType: AclPermissionType,
- principal: KafkaPrincipal = ClientPrincipal):
AccessControlEntry = {
- new AccessControlEntry(principal.toString, WILDCARD_HOST, aclOperation,
aclPermissionType)
- }
-
- @ParameterizedTest(name =
TestInfoUtils.TestWithParameterizedGroupProtocolNames)
- @MethodSource(Array("getTestGroupProtocolParametersAll"))
- def testUnauthorizedProduceAndConsume(groupProtocol: String): Unit = {
- val topic = "topic"
- val topicPartition = new TopicPartition("topic", 0)
-
- createTopic(topic, listenerName = interBrokerListenerName)
-
- val producer = createProducer()
- val produceException = assertThrows(classOf[ExecutionException],
- () => producer.send(new ProducerRecord[Array[Byte], Array[Byte]](topic,
"message".getBytes)).get()).getCause
- assertTrue(produceException.isInstanceOf[TopicAuthorizationException])
- assertEquals(Set(topic),
produceException.asInstanceOf[TopicAuthorizationException].unauthorizedTopics.asScala)
-
- val consumer = createConsumer(configsToRemove =
List(ConsumerConfig.GROUP_ID_CONFIG))
- consumer.assign(java.util.List.of(topicPartition))
- val consumeException = assertThrows(classOf[TopicAuthorizationException],
- () => TestUtils.pollUntilAtLeastNumRecords(consumer, numRecords = 1))
- assertEquals(Set(topic), consumeException.unauthorizedTopics.asScala)
- }
-
- @ParameterizedTest(name =
TestInfoUtils.TestWithParameterizedGroupProtocolNames)
- @MethodSource(Array("getTestGroupProtocolParametersAll"))
- @Timeout(60)
- def testConsumeUnsubscribeWithoutGroupPermission(groupProtocol: String):
Unit = {
- val topic = "topic"
-
- createTopic(topic, listenerName = interBrokerListenerName)
-
- // allow topic read/write permission to poll/send record
- addAndVerifyAcls(
- Set(createAcl(AclOperation.WRITE, AclPermissionType.ALLOW),
createAcl(AclOperation.READ, AclPermissionType.ALLOW)),
- new ResourcePattern(ResourceType.TOPIC, topic, PatternType.LITERAL)
- )
- val producer = createProducer()
- producer.send(new ProducerRecord[Array[Byte], Array[Byte]](topic,
"message".getBytes)).get()
- producer.close()
-
- // allow group read permission to join group
- val group = "group"
- addAndVerifyAcls(
- Set(createAcl(AclOperation.READ, AclPermissionType.ALLOW)),
- new ResourcePattern(ResourceType.GROUP, group, PatternType.LITERAL)
- )
-
- val props = new Properties()
- props.put(ConsumerConfig.GROUP_ID_CONFIG, group)
- props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false")
- val consumer = createConsumer(configOverrides = props)
- consumer.subscribe(java.util.List.of(topic))
- TestUtils.pollUntilAtLeastNumRecords(consumer, numRecords = 1)
-
- removeAndVerifyAcls(
- Set(createAcl(AclOperation.READ, AclPermissionType.ALLOW)),
- new ResourcePattern(ResourceType.GROUP, group, PatternType.LITERAL)
- )
-
- assertDoesNotThrow(new Executable {
- override def execute(): Unit = consumer.unsubscribe()
- })
- }
-
- @ParameterizedTest(name =
TestInfoUtils.TestWithParameterizedGroupProtocolNames)
- @MethodSource(Array("getTestGroupProtocolParametersAll"))
- def testConsumeCloseWithoutGroupPermission(groupProtocol: String): Unit = {
- val topic = "topic"
- createTopic(topic, listenerName = interBrokerListenerName)
-
- // allow topic read/write permission to poll/send record
- addAndVerifyAcls(
- Set(createAcl(AclOperation.WRITE, AclPermissionType.ALLOW),
createAcl(AclOperation.READ, AclPermissionType.ALLOW)),
- new ResourcePattern(ResourceType.TOPIC, topic, PatternType.LITERAL)
- )
- val producer = createProducer()
- producer.send(new ProducerRecord[Array[Byte], Array[Byte]](topic,
"message".getBytes)).get()
-
- // allow group read permission to join group
- val group = "group"
- addAndVerifyAcls(
- Set(createAcl(AclOperation.READ, AclPermissionType.ALLOW)),
- new ResourcePattern(ResourceType.GROUP, group, PatternType.LITERAL)
- )
-
- val props = new Properties()
- props.put(ConsumerConfig.GROUP_ID_CONFIG, group)
- props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false")
- val consumer = createConsumer(configOverrides = props)
- consumer.subscribe(java.util.List.of(topic))
- TestUtils.pollUntilAtLeastNumRecords(consumer, numRecords = 1)
-
- removeAndVerifyAcls(
- Set(createAcl(AclOperation.READ, AclPermissionType.ALLOW)),
- new ResourcePattern(ResourceType.GROUP, group, PatternType.LITERAL)
- )
-
- assertDoesNotThrow(new Executable {
- override def execute(): Unit = consumer.close()
- })
- }
-
- @ParameterizedTest(name =
TestInfoUtils.TestWithParameterizedGroupProtocolNames)
- @MethodSource(Array("getTestGroupProtocolParametersAll"))
- def testAuthorizedProduceAndConsume(groupProtocol: String): Unit = {
- val topic = "topic"
- val topicPartition = new TopicPartition("topic", 0)
-
- createTopic(topic, listenerName = interBrokerListenerName)
-
- addAndVerifyAcls(
- Set(createAcl(AclOperation.WRITE, AclPermissionType.ALLOW)),
- new ResourcePattern(ResourceType.TOPIC, topic, PatternType.LITERAL)
- )
- val producer = createProducer()
- producer.send(new ProducerRecord[Array[Byte], Array[Byte]](topic,
"message".getBytes)).get()
-
- addAndVerifyAcls(
- Set(createAcl(AclOperation.READ, AclPermissionType.ALLOW)),
- new ResourcePattern(ResourceType.TOPIC, topic, PatternType.LITERAL)
- )
- val consumer = createConsumer(configsToRemove =
List(ConsumerConfig.GROUP_ID_CONFIG))
- consumer.assign(java.util.List.of(topicPartition))
- TestUtils.pollUntilAtLeastNumRecords(consumer, numRecords = 1)
- }
-
-}