This is an automated email from the ASF dual-hosted git repository.
lucasbru pushed a commit to branch 4.1
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/4.1 by this push:
new 0f9ed787548 KAFKA-19413: Extended AuthorizerIntegrationTest to cover
StreamsGroupDescribe (#19981)
0f9ed787548 is described below
commit 0f9ed78754845f11fdc83dd930a1614f92dfa043
Author: Lucas Brutschy <[email protected]>
AuthorDate: Wed Jun 18 10:19:34 2025 +0200
KAFKA-19413: Extended AuthorizerIntegrationTest to cover
StreamsGroupDescribe (#19981)
Extending test coverage of authorization for streams group RPC
StreamsGroupDescribe. The RPC requires DESCRIBE GROUP and DESCRIBE TOPIC
permissions for all topics.
Reviewers: Bill Bejeck <[email protected]>
---
.../kafka/api/AuthorizerIntegrationTest.scala | 181 ++++++++++++++++++++-
.../kafka/api/IntegrationTestHarness.scala | 35 +++-
2 files changed, 209 insertions(+), 7 deletions(-)
diff --git
a/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
b/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
index a869e2eb4ab..424772275ea 100644
--- a/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
+++ b/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
@@ -17,11 +17,12 @@ import java.time.Duration
import java.util
import java.util.concurrent.{ExecutionException, Semaphore}
import java.util.regex.Pattern
-import java.util.{Comparator, Optional, Properties}
+import java.util.{Comparator, Optional, Properties, UUID}
import kafka.utils.{TestInfoUtils, TestUtils}
import kafka.utils.TestUtils.waitUntilTrue
import org.apache.kafka.clients.admin.{Admin, AlterConfigOp,
ListGroupsOptions, NewTopic}
import org.apache.kafka.clients.consumer._
+import org.apache.kafka.clients.consumer.internals.{StreamsRebalanceData,
StreamsRebalanceListener}
import org.apache.kafka.clients.producer._
import org.apache.kafka.common.acl.AclOperation._
import org.apache.kafka.common.acl.AclPermissionType.{ALLOW, DENY}
@@ -37,7 +38,7 @@ import
org.apache.kafka.common.message.JoinGroupRequestData.JoinGroupRequestProt
import org.apache.kafka.common.message.LeaveGroupRequestData.MemberIdentity
import
org.apache.kafka.common.message.ListOffsetsRequestData.{ListOffsetsPartition,
ListOffsetsTopic}
import
org.apache.kafka.common.message.OffsetForLeaderEpochRequestData.{OffsetForLeaderPartition,
OffsetForLeaderTopic, OffsetForLeaderTopicCollection}
-import org.apache.kafka.common.message.{AddOffsetsToTxnRequestData,
AlterPartitionReassignmentsRequestData, AlterReplicaLogDirsRequestData,
AlterShareGroupOffsetsRequestData, ConsumerGroupDescribeRequestData,
ConsumerGroupHeartbeatRequestData, ConsumerGroupHeartbeatResponseData,
CreateAclsRequestData, CreatePartitionsRequestData, CreateTopicsRequestData,
DeleteAclsRequestData, DeleteGroupsRequestData, DeleteRecordsRequestData,
DeleteShareGroupOffsetsRequestData, DeleteShareGroupStateRequ [...]
+import org.apache.kafka.common.message.{AddOffsetsToTxnRequestData,
AlterPartitionReassignmentsRequestData, AlterReplicaLogDirsRequestData,
AlterShareGroupOffsetsRequestData, ConsumerGroupDescribeRequestData,
ConsumerGroupHeartbeatRequestData, ConsumerGroupHeartbeatResponseData,
CreateAclsRequestData, CreatePartitionsRequestData, CreateTopicsRequestData,
DeleteAclsRequestData, DeleteGroupsRequestData, DeleteRecordsRequestData,
DeleteShareGroupOffsetsRequestData, DeleteShareGroupStateRequ [...]
import org.apache.kafka.common.network.ListenerName
import org.apache.kafka.common.protocol.{ApiKeys, Errors}
import org.apache.kafka.common.record.{MemoryRecords, RecordBatch,
SimpleRecord}
@@ -76,6 +77,7 @@ class AuthorizerIntegrationTest extends
AbstractAuthorizerIntegrationTest {
val shareGroupDescribeConfigsAcl = Map(shareGroupResource -> Set(new
AccessControlEntry(clientPrincipalString, WILDCARD_HOST, DESCRIBE_CONFIGS,
ALLOW)))
val shareGroupAlterConfigsAcl = Map(shareGroupResource -> Set(new
AccessControlEntry(clientPrincipalString, WILDCARD_HOST, ALTER_CONFIGS, ALLOW)))
val streamsGroupReadAcl = Map(streamsGroupResource -> Set(new
AccessControlEntry(clientPrincipalString, WILDCARD_HOST, READ, ALLOW)))
+ val streamsGroupDescribeAcl = Map(streamsGroupResource -> Set(new
AccessControlEntry(clientPrincipalString, WILDCARD_HOST, DESCRIBE, ALLOW)))
val clusterAcl = Map(clusterResource -> Set(new
AccessControlEntry(clientPrincipalString, WILDCARD_HOST, CLUSTER_ACTION,
ALLOW)))
val clusterCreateAcl = Map(clusterResource -> Set(new
AccessControlEntry(clientPrincipalString, WILDCARD_HOST, CREATE, ALLOW)))
val clusterAlterAcl = Map(clusterResource -> Set(new
AccessControlEntry(clientPrincipalString, WILDCARD_HOST, ALTER, ALLOW)))
@@ -225,7 +227,9 @@ class AuthorizerIntegrationTest extends
AbstractAuthorizerIntegrationTest {
resp.data.errorCode)),
ApiKeys.ALTER_SHARE_GROUP_OFFSETS -> ((resp:
AlterShareGroupOffsetsResponse) => Errors.forCode(
resp.data.errorCode)),
- ApiKeys.STREAMS_GROUP_HEARTBEAT -> ((resp: StreamsGroupHeartbeatResponse)
=> Errors.forCode(resp.data.errorCode))
+ ApiKeys.STREAMS_GROUP_HEARTBEAT -> ((resp: StreamsGroupHeartbeatResponse)
=> Errors.forCode(resp.data.errorCode)),
+ ApiKeys.STREAMS_GROUP_DESCRIBE -> ((resp: StreamsGroupDescribeResponse) =>
+ Errors.forCode(resp.data.groups.asScala.find(g => streamsGroup ==
g.groupId).head.errorCode))
)
def findErrorForTopicId(id: Uuid, response: AbstractResponse): Errors = {
@@ -294,7 +298,8 @@ class AuthorizerIntegrationTest extends
AbstractAuthorizerIntegrationTest {
ApiKeys.DESCRIBE_SHARE_GROUP_OFFSETS -> (shareGroupDescribeAcl ++
topicDescribeAcl),
ApiKeys.DELETE_SHARE_GROUP_OFFSETS -> (shareGroupDeleteAcl ++
topicReadAcl),
ApiKeys.ALTER_SHARE_GROUP_OFFSETS -> (shareGroupReadAcl ++ topicReadAcl),
- ApiKeys.STREAMS_GROUP_HEARTBEAT -> (streamsGroupReadAcl ++
topicDescribeAcl)
+ ApiKeys.STREAMS_GROUP_HEARTBEAT -> (streamsGroupReadAcl ++
topicDescribeAcl),
+ ApiKeys.STREAMS_GROUP_DESCRIBE -> (streamsGroupDescribeAcl ++
topicDescribeAcl),
)
private def createMetadataRequest(allowAutoTopicCreation: Boolean) = {
@@ -870,6 +875,11 @@ class AuthorizerIntegrationTest extends
AbstractAuthorizerIntegrationTest {
).asJava
))).build(ApiKeys.STREAMS_GROUP_HEARTBEAT.latestVersion)
+ private def streamsGroupDescribeRequest = new
StreamsGroupDescribeRequest.Builder(
+ new StreamsGroupDescribeRequestData()
+ .setGroupIds(List(streamsGroup).asJava)
+
.setIncludeAuthorizedOperations(false)).build(ApiKeys.STREAMS_GROUP_DESCRIBE.latestVersion)
+
private def sendRequests(requestKeyToRequest: mutable.Map[ApiKeys,
AbstractRequest], topicExists: Boolean = true,
topicNames: Map[Uuid, String] = getTopicNames()) = {
for ((key, request) <- requestKeyToRequest) {
@@ -954,6 +964,7 @@ class AuthorizerIntegrationTest extends
AbstractAuthorizerIntegrationTest {
ApiKeys.DELETE_SHARE_GROUP_OFFSETS -> deleteShareGroupOffsetsRequest,
ApiKeys.ALTER_SHARE_GROUP_OFFSETS -> alterShareGroupOffsetsRequest,
ApiKeys.STREAMS_GROUP_HEARTBEAT -> streamsGroupHeartbeatRequest,
+ ApiKeys.STREAMS_GROUP_DESCRIBE -> streamsGroupDescribeRequest,
// Delete the topic last
ApiKeys.DELETE_TOPICS -> deleteTopicsRequest
@@ -987,7 +998,8 @@ class AuthorizerIntegrationTest extends
AbstractAuthorizerIntegrationTest {
ApiKeys.SHARE_FETCH -> createShareFetchRequest,
ApiKeys.SHARE_ACKNOWLEDGE -> shareAcknowledgeRequest,
ApiKeys.DESCRIBE_SHARE_GROUP_OFFSETS -> describeShareGroupOffsetsRequest,
- ApiKeys.STREAMS_GROUP_HEARTBEAT -> streamsGroupHeartbeatRequest
+ ApiKeys.STREAMS_GROUP_HEARTBEAT -> streamsGroupHeartbeatRequest,
+ ApiKeys.STREAMS_GROUP_DESCRIBE -> streamsGroupDescribeRequest
)
sendRequests(requestKeyToRequest, topicExists = false, topicNames)
@@ -3853,6 +3865,165 @@ class AuthorizerIntegrationTest extends
AbstractAuthorizerIntegrationTest {
response.data().status())
}
+ private def createStreamsGroupToDescribe(
+ topicAsSourceTopic: Boolean,
+ topicAsRepartitionSinkTopic:
Boolean,
+ topicAsRepartitionSourceTopic:
Boolean,
+ topicAsStateChangelogTopics:
Boolean
+ ): Unit = {
+ createTopicWithBrokerPrincipal(sourceTopic)
+ createTopicWithBrokerPrincipal(topic)
+ addAndVerifyAcls(Set(new AccessControlEntry(clientPrincipalString,
WILDCARD_HOST, READ, ALLOW)), streamsGroupResource)
+ addAndVerifyAcls(Set(new AccessControlEntry(clientPrincipalString,
WILDCARD_HOST, READ, ALLOW)), topicResource)
+ addAndVerifyAcls(Set(new AccessControlEntry(clientPrincipalString,
WILDCARD_HOST, READ, ALLOW)), sourceTopicResource)
+ streamsConsumerConfig.put(ConsumerConfig.GROUP_ID_CONFIG, streamsGroup)
+ streamsConsumerConfig.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,
"false")
+ val consumer = createStreamsConsumer(streamsRebalanceData = new
StreamsRebalanceData(
+ UUID.randomUUID(),
+ Optional.empty(),
+ util.Map.of(
+ "subtopology-0", new StreamsRebalanceData.Subtopology(
+ if (topicAsSourceTopic) util.Set.of(sourceTopic, topic) else
util.Set.of(sourceTopic),
+ if (topicAsRepartitionSinkTopic) util.Set.of(topic) else
util.Set.of(),
+ if (topicAsRepartitionSourceTopic)
+ util.Map.of(topic, new
StreamsRebalanceData.TopicInfo(Optional.of(1), Optional.empty(), util.Map.of()))
+ else util.Map.of(),
+ if (topicAsStateChangelogTopics)
+ util.Map.of(topic, new
StreamsRebalanceData.TopicInfo(Optional.of(1), Optional.empty(), util.Map.of()))
+ else util.Map.of(),
+ util.Set.of()
+ )),
+ Map.empty[String, String].asJava
+ ))
+ consumer.subscribe(
+ if (topicAsSourceTopic || topicAsRepartitionSourceTopic)
util.Set.of(sourceTopic, topic) else util.Set.of(sourceTopic),
+ new StreamsRebalanceListener {
+ override def onTasksRevoked(tasks:
util.Set[StreamsRebalanceData.TaskId]): Optional[Exception] =
+ Optional.empty()
+
+ override def onTasksAssigned(assignment:
StreamsRebalanceData.Assignment): Optional[Exception] =
+ Optional.empty()
+
+ override def onAllTasksLost(): Optional[Exception] =
+ Optional.empty()
+ }
+ )
+ consumer.poll(Duration.ofMillis(500L))
+ removeAllClientAcls()
+ }
+
+ @ParameterizedTest
+ @CsvSource(Array(
+ "true, false, false, false",
+ "false, true, false, false",
+ "false, false, true, false",
+ "false, false, false, true"
+ ))
+ def testStreamsGroupDescribeWithGroupDescribeAndTopicDescribeAcl(
+
topicAsSourceTopic: Boolean,
+
topicAsRepartitionSinkTopic: Boolean,
+
topicAsRepartitionSourceTopic: Boolean,
+
topicAsStateChangelogTopics: Boolean
+ ): Unit = {
+ createStreamsGroupToDescribe(
+ topicAsSourceTopic,
+ topicAsRepartitionSinkTopic,
+ topicAsRepartitionSourceTopic,
+ topicAsStateChangelogTopics
+ )
+ addAndVerifyAcls(streamsGroupDescribeAcl(streamsGroupResource),
streamsGroupResource)
+ addAndVerifyAcls(sourceTopicDescribeAcl(sourceTopicResource),
sourceTopicResource) // Always added, since we need a source topic
+ addAndVerifyAcls(topicDescribeAcl(topicResource), topicResource)
+
+ val request = streamsGroupDescribeRequest
+ val resource = Set[ResourceType](GROUP, TOPIC)
+ sendRequestAndVerifyResponseError(request, resource, isAuthorized = true)
+ }
+
+ @ParameterizedTest
+ @CsvSource(Array(
+ "true, false, false, false",
+ "false, true, false, false",
+ "false, false, true, false",
+ "false, false, false, true"
+ ))
+ def testStreamsGroupDescribeWithOperationAll(
+ topicAsSourceTopic: Boolean,
+ topicAsRepartitionSinkTopic:
Boolean,
+ topicAsRepartitionSourceTopic:
Boolean,
+ topicAsStateChangelogTopics:
Boolean
+ ): Unit = {
+ createStreamsGroupToDescribe(
+ topicAsSourceTopic,
+ topicAsRepartitionSinkTopic,
+ topicAsRepartitionSourceTopic,
+ topicAsStateChangelogTopics
+ )
+
+ val allowAllOpsAcl = new AccessControlEntry(clientPrincipalString,
WILDCARD_HOST, ALL, ALLOW)
+ addAndVerifyAcls(Set(allowAllOpsAcl), streamsGroupResource)
+ addAndVerifyAcls(sourceTopicDescribeAcl(sourceTopicResource),
sourceTopicResource) // Always added, since we need a source topic
+ addAndVerifyAcls(Set(allowAllOpsAcl), topicResource)
+
+ val request = streamsGroupDescribeRequest
+ val resource = Set[ResourceType](GROUP, TOPIC)
+ sendRequestAndVerifyResponseError(request, resource, isAuthorized = true)
+ }
+
+ @ParameterizedTest
+ @CsvSource(Array(
+ "true, false, false, false",
+ "false, true, false, false",
+ "false, false, true, false",
+ "false, false, false, true"
+ ))
+ def testStreamsGroupDescribeWithoutGroupDescribeAcl(
+ topicAsSourceTopic:
Boolean,
+
topicAsRepartitionSinkTopic: Boolean,
+
topicAsRepartitionSourceTopic: Boolean,
+
topicAsStateChangelogTopics: Boolean
+ ): Unit = {
+ createStreamsGroupToDescribe(
+ topicAsSourceTopic,
+ topicAsRepartitionSinkTopic,
+ topicAsRepartitionSourceTopic,
+ topicAsStateChangelogTopics
+ )
+ addAndVerifyAcls(sourceTopicDescribeAcl(sourceTopicResource),
sourceTopicResource) // Always added, since we need a source topic
+ addAndVerifyAcls(topicDescribeAcl(topicResource), topicResource)
+
+ val request = streamsGroupDescribeRequest
+ val resource = Set[ResourceType](GROUP, TOPIC)
+ sendRequestAndVerifyResponseError(request, resource, isAuthorized = false)
+ }
+
+ @ParameterizedTest
+ @CsvSource(Array(
+ "true, false, false, false",
+ "false, true, false, false",
+ "false, false, true, false",
+ "false, false, false, true"
+ ))
+ def testStreamsGroupDescribeWithoutGroupDescribeOrTopicDescribeAcl(
+
topicAsSourceTopic: Boolean,
+
topicAsRepartitionSinkTopic: Boolean,
+
topicAsRepartitionSourceTopic: Boolean,
+
topicAsStateChangelogTopics: Boolean
+ ): Unit = {
+ createStreamsGroupToDescribe(
+ topicAsSourceTopic,
+ topicAsRepartitionSinkTopic,
+ topicAsRepartitionSourceTopic,
+ topicAsStateChangelogTopics
+ )
+
+ val request = streamsGroupDescribeRequest
+ val resource = Set[ResourceType](GROUP, TOPIC)
+ addAndVerifyAcls(sourceTopicDescribeAcl(sourceTopicResource),
sourceTopicResource) // Always added, since we need a source topic
+
+ sendRequestAndVerifyResponseError(request, resource, isAuthorized = false)
+ }
+
private def sendAndReceiveFirstRegexHeartbeat(memberId: String,
listenerName: ListenerName):
ConsumerGroupHeartbeatResponseData = {
val request = new ConsumerGroupHeartbeatRequest.Builder(
diff --git
a/core/src/test/scala/integration/kafka/api/IntegrationTestHarness.scala
b/core/src/test/scala/integration/kafka/api/IntegrationTestHarness.scala
index e062dcc09fa..7c08dd9c3fe 100644
--- a/core/src/test/scala/integration/kafka/api/IntegrationTestHarness.scala
+++ b/core/src/test/scala/integration/kafka/api/IntegrationTestHarness.scala
@@ -22,14 +22,16 @@ import org.apache.kafka.clients.consumer.{Consumer,
ConsumerConfig, KafkaConsume
import kafka.utils.TestUtils
import kafka.utils.Implicits._
-import java.util.Properties
+import java.util.{Optional, Properties}
import org.apache.kafka.clients.producer.{KafkaProducer, ProducerConfig}
import kafka.server.KafkaConfig
import kafka.integration.KafkaServerTestHarness
import kafka.security.JaasTestUtils
import org.apache.kafka.clients.admin.{Admin, AdminClientConfig}
+import org.apache.kafka.clients.consumer.internals.{AsyncKafkaConsumer,
StreamsRebalanceData}
import org.apache.kafka.common.network.{ConnectionMode, ListenerName}
import org.apache.kafka.common.serialization.{ByteArrayDeserializer,
ByteArraySerializer, Deserializer, Serializer}
+import org.apache.kafka.common.utils.Utils
import org.apache.kafka.network.SocketServerConfigs
import org.apache.kafka.raft.MetadataLogConfig
import org.apache.kafka.server.config.{KRaftConfigs, ReplicationConfigs}
@@ -49,6 +51,7 @@ abstract class IntegrationTestHarness extends
KafkaServerTestHarness {
val producerConfig = new Properties
val consumerConfig = new Properties
val shareConsumerConfig = new Properties
+ val streamsConsumerConfig = new Properties
val adminClientConfig = new Properties
val superuserClientConfig = new Properties
val serverConfig = new Properties
@@ -56,6 +59,7 @@ abstract class IntegrationTestHarness extends
KafkaServerTestHarness {
private val consumers = mutable.Buffer[Consumer[_, _]]()
private val shareConsumers = mutable.Buffer[ShareConsumer[_, _]]()
+ private val streamsConsumers = mutable.Buffer[Consumer[_, _]]()
private val producers = mutable.Buffer[KafkaProducer[_, _]]()
private val adminClients = mutable.Buffer[Admin]()
@@ -148,7 +152,12 @@ abstract class IntegrationTestHarness extends
KafkaServerTestHarness {
shareConsumerConfig.putIfAbsent(ConsumerConfig.GROUP_ID_CONFIG, "group")
shareConsumerConfig.putIfAbsent(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
classOf[ByteArrayDeserializer].getName)
shareConsumerConfig.putIfAbsent(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
classOf[ByteArrayDeserializer].getName)
-
+
+ streamsConsumerConfig.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,
bootstrapServers())
+ streamsConsumerConfig.putIfAbsent(ConsumerConfig.GROUP_ID_CONFIG, "group")
+
streamsConsumerConfig.putIfAbsent(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
classOf[ByteArrayDeserializer].getName)
+
streamsConsumerConfig.putIfAbsent(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
classOf[ByteArrayDeserializer].getName)
+
adminClientConfig.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG,
bootstrapServers())
doSuperuserSetup(testInfo)
@@ -207,6 +216,25 @@ abstract class IntegrationTestHarness extends
KafkaServerTestHarness {
shareConsumer
}
+ def createStreamsConsumer[K, V](keyDeserializer: Deserializer[K] = new
ByteArrayDeserializer,
+ valueDeserializer: Deserializer[V] = new
ByteArrayDeserializer,
+ configOverrides: Properties = new Properties,
+ configsToRemove: List[String] = List(),
+ streamsRebalanceData: StreamsRebalanceData):
AsyncKafkaConsumer[K, V] = {
+ val props = new Properties
+ props ++= streamsConsumerConfig
+ props ++= configOverrides
+ configsToRemove.foreach(props.remove(_))
+ val streamsConsumer = new AsyncKafkaConsumer[K, V](
+ new
ConsumerConfig(ConsumerConfig.appendDeserializerToConfig(Utils.propsToMap(props),
keyDeserializer, valueDeserializer)),
+ keyDeserializer,
+ valueDeserializer,
+ Optional.of(streamsRebalanceData)
+ )
+ streamsConsumers += streamsConsumer
+ streamsConsumer
+ }
+
def createAdminClient(
listenerName: ListenerName = listenerName,
configOverrides: Properties = new Properties
@@ -239,11 +267,14 @@ abstract class IntegrationTestHarness extends
KafkaServerTestHarness {
consumers.foreach(_.close(Duration.ZERO))
shareConsumers.foreach(_.wakeup())
shareConsumers.foreach(_.close(Duration.ZERO))
+ streamsConsumers.foreach(_.wakeup())
+ streamsConsumers.foreach(_.close(Duration.ZERO))
adminClients.foreach(_.close(Duration.ZERO))
producers.clear()
consumers.clear()
shareConsumers.clear()
+ streamsConsumers.clear()
adminClients.clear()
} finally {
super.tearDown()