This is an automated email from the ASF dual-hosted git repository.
schofielaj pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new e3c456ff0fe KAFKA-19169: Enhance AuthorizerIntegrationTest for share
group APIs (#19540)
e3c456ff0fe is described below
commit e3c456ff0fe96827e55207a3326d68303563cd0d
Author: Lan Ding <[email protected]>
AuthorDate: Thu May 1 17:13:43 2025 +0800
KAFKA-19169: Enhance AuthorizerIntegrationTest for share group APIs (#19540)
Enhance AuthorizerIntegrationTest for share group APIs
Reviewers: Andrew Schofield <[email protected]>
---
.../api/AbstractAuthorizerIntegrationTest.scala | 2 +
.../kafka/api/AuthorizerIntegrationTest.scala | 648 ++++++++++++++++++++-
2 files changed, 644 insertions(+), 6 deletions(-)
diff --git
a/core/src/test/scala/integration/kafka/api/AbstractAuthorizerIntegrationTest.scala
b/core/src/test/scala/integration/kafka/api/AbstractAuthorizerIntegrationTest.scala
index dc836352787..54f6d71a278 100644
---
a/core/src/test/scala/integration/kafka/api/AbstractAuthorizerIntegrationTest.scala
+++
b/core/src/test/scala/integration/kafka/api/AbstractAuthorizerIntegrationTest.scala
@@ -80,11 +80,13 @@ class AbstractAuthorizerIntegrationTest extends
BaseRequestTest {
val tp = new TopicPartition(topic, part)
val logDir = "logDir"
val group = "my-group"
+ val shareGroup = "share-group"
val protocolType = "consumer"
val protocolName = "consumer-range"
val clusterResource = new ResourcePattern(CLUSTER, Resource.CLUSTER_NAME,
LITERAL)
val topicResource = new ResourcePattern(TOPIC, topic, LITERAL)
val groupResource = new ResourcePattern(GROUP, group, LITERAL)
+ val shareGroupResource = new ResourcePattern(GROUP, shareGroup, LITERAL)
val transactionalIdResource = new ResourcePattern(TRANSACTIONAL_ID,
transactionalId, LITERAL)
producerConfig.setProperty(ProducerConfig.ACKS_CONFIG, "1")
diff --git
a/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
b/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
index d75bdc9df6d..d42ba245f6e 100644
--- a/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
+++ b/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
@@ -37,7 +37,7 @@ import
org.apache.kafka.common.message.JoinGroupRequestData.JoinGroupRequestProt
import org.apache.kafka.common.message.LeaveGroupRequestData.MemberIdentity
import
org.apache.kafka.common.message.ListOffsetsRequestData.{ListOffsetsPartition,
ListOffsetsTopic}
import
org.apache.kafka.common.message.OffsetForLeaderEpochRequestData.{OffsetForLeaderPartition,
OffsetForLeaderTopic, OffsetForLeaderTopicCollection}
-import org.apache.kafka.common.message.{AddOffsetsToTxnRequestData,
AlterPartitionReassignmentsRequestData, AlterReplicaLogDirsRequestData,
ConsumerGroupDescribeRequestData, ConsumerGroupHeartbeatRequestData,
ConsumerGroupHeartbeatResponseData, CreateAclsRequestData,
CreatePartitionsRequestData, CreateTopicsRequestData, DeleteAclsRequestData,
DeleteGroupsRequestData, DeleteRecordsRequestData, DeleteTopicsRequestData,
DescribeClusterRequestData, DescribeConfigsRequestData, DescribeGroupsR [...]
+import org.apache.kafka.common.message.{AddOffsetsToTxnRequestData,
AlterPartitionReassignmentsRequestData, AlterReplicaLogDirsRequestData,
ConsumerGroupDescribeRequestData, ConsumerGroupHeartbeatRequestData,
ConsumerGroupHeartbeatResponseData, CreateAclsRequestData,
CreatePartitionsRequestData, CreateTopicsRequestData, DeleteAclsRequestData,
DeleteGroupsRequestData, DeleteRecordsRequestData,
DeleteShareGroupOffsetsRequestData, DeleteShareGroupStateRequestData,
DeleteTopicsRequestData, D [...]
import org.apache.kafka.common.network.ListenerName
import org.apache.kafka.common.protocol.{ApiKeys, Errors}
import org.apache.kafka.common.record.{MemoryRecords, RecordBatch,
SimpleRecord}
@@ -48,7 +48,7 @@ import org.apache.kafka.common.resource.ResourceType._
import org.apache.kafka.common.resource.{PatternType, Resource,
ResourcePattern, ResourcePatternFilter, ResourceType}
import org.apache.kafka.common.security.auth.KafkaPrincipal
import org.apache.kafka.common.utils.Utils
-import org.apache.kafka.common.{ElectionType, IsolationLevel, KafkaException,
TopicPartition, Uuid, requests}
+import org.apache.kafka.common.{ElectionType, IsolationLevel, KafkaException,
TopicIdPartition, TopicPartition, Uuid, requests}
import org.apache.kafka.test.{TestUtils => JTestUtils}
import org.apache.kafka.security.authorizer.AclEntry
import org.apache.kafka.security.authorizer.AclEntry.WILDCARD_HOST
@@ -73,6 +73,11 @@ class AuthorizerIntegrationTest extends
AbstractAuthorizerIntegrationTest {
val groupDeleteAcl = Map(groupResource -> Set(new
AccessControlEntry(clientPrincipalString, WILDCARD_HOST, DELETE, ALLOW)))
val groupDescribeConfigsAcl = Map(groupResource -> Set(new
AccessControlEntry(clientPrincipalString, WILDCARD_HOST, DESCRIBE_CONFIGS,
ALLOW)))
val groupAlterConfigsAcl = Map(groupResource -> Set(new
AccessControlEntry(clientPrincipalString, WILDCARD_HOST, ALTER_CONFIGS, ALLOW)))
+ val shareGroupReadAcl = Map(shareGroupResource -> Set(new
AccessControlEntry(clientPrincipalString, WILDCARD_HOST, READ, ALLOW)))
+ val shareGroupDescribeAcl = Map(shareGroupResource -> Set(new
AccessControlEntry(clientPrincipalString, WILDCARD_HOST, DESCRIBE, ALLOW)))
+ val shareGroupDeleteAcl = Map(shareGroupResource -> Set(new
AccessControlEntry(clientPrincipalString, WILDCARD_HOST, DELETE, ALLOW)))
+ val shareGroupDescribeConfigsAcl = Map(shareGroupResource -> Set(new
AccessControlEntry(clientPrincipalString, WILDCARD_HOST, DESCRIBE_CONFIGS,
ALLOW)))
+ val shareGroupAlterConfigsAcl = Map(shareGroupResource -> Set(new
AccessControlEntry(clientPrincipalString, WILDCARD_HOST, ALTER_CONFIGS, ALLOW)))
val clusterAcl = Map(clusterResource -> Set(new
AccessControlEntry(clientPrincipalString, WILDCARD_HOST, CLUSTER_ACTION,
ALLOW)))
val clusterCreateAcl = Map(clusterResource -> Set(new
AccessControlEntry(clientPrincipalString, WILDCARD_HOST, CREATE, ALLOW)))
val clusterAlterAcl = Map(clusterResource -> Set(new
AccessControlEntry(clientPrincipalString, WILDCARD_HOST, ALTER, ALLOW)))
@@ -199,7 +204,26 @@ class AuthorizerIntegrationTest extends
AbstractAuthorizerIntegrationTest {
}),
ApiKeys.CONSUMER_GROUP_HEARTBEAT -> ((resp:
ConsumerGroupHeartbeatResponse) => Errors.forCode(resp.data.errorCode)),
ApiKeys.CONSUMER_GROUP_DESCRIBE -> ((resp: ConsumerGroupDescribeResponse)
=>
- Errors.forCode(resp.data.groups.asScala.find(g => group ==
g.groupId).head.errorCode))
+ Errors.forCode(resp.data.groups.asScala.find(g => group ==
g.groupId).head.errorCode)),
+ ApiKeys.SHARE_GROUP_HEARTBEAT -> ((resp: ShareGroupHeartbeatResponse) =>
Errors.forCode(resp.data.errorCode)),
+ ApiKeys.SHARE_GROUP_DESCRIBE -> ((resp: ShareGroupDescribeResponse) =>
+ Errors.forCode(resp.data.groups.asScala.find(g => shareGroup ==
g.groupId).head.errorCode)),
+ ApiKeys.SHARE_FETCH -> ((resp: ShareFetchResponse) =>
Errors.forCode(resp.data.errorCode)),
+ ApiKeys.SHARE_ACKNOWLEDGE -> ((resp: ShareAcknowledgeResponse) =>
Errors.forCode(resp.data.errorCode)),
+ ApiKeys.INITIALIZE_SHARE_GROUP_STATE -> ((resp:
InitializeShareGroupStateResponse) => Errors.forCode(
+ resp.data.results.get(0).partitions.get(0).errorCode)),
+ ApiKeys.READ_SHARE_GROUP_STATE -> ((resp: ReadShareGroupStateResponse) =>
Errors.forCode(
+ resp.data.results.get(0).partitions.get(0).errorCode)),
+ ApiKeys.WRITE_SHARE_GROUP_STATE -> ((resp: WriteShareGroupStateResponse)
=> Errors.forCode(
+ resp.data.results.get(0).partitions.get(0).errorCode)),
+ ApiKeys.DELETE_SHARE_GROUP_STATE -> ((resp: DeleteShareGroupStateResponse)
=> Errors.forCode(
+ resp.data.results.get(0).partitions.get(0).errorCode)),
+ ApiKeys.READ_SHARE_GROUP_STATE_SUMMARY -> ((resp:
ReadShareGroupStateSummaryResponse) => Errors.forCode(
+ resp.data.results.get(0).partitions.get(0).errorCode)),
+ ApiKeys.DESCRIBE_SHARE_GROUP_OFFSETS -> ((resp:
DescribeShareGroupOffsetsResponse) => Errors.forCode(
+ resp.data.groups.asScala.find(g => shareGroup ==
g.groupId).head.errorCode)),
+ ApiKeys.DELETE_SHARE_GROUP_OFFSETS -> ((resp:
DeleteShareGroupOffsetsResponse) => Errors.forCode(
+ resp.data.errorCode))
)
def findErrorForTopicId(id: Uuid, response: AbstractResponse): Errors = {
@@ -255,7 +279,18 @@ class AuthorizerIntegrationTest extends
AbstractAuthorizerIntegrationTest {
ApiKeys.DESCRIBE_PRODUCERS -> topicReadAcl,
ApiKeys.DESCRIBE_TRANSACTIONS -> transactionalIdDescribeAcl,
ApiKeys.CONSUMER_GROUP_HEARTBEAT -> groupReadAcl,
- ApiKeys.CONSUMER_GROUP_DESCRIBE -> groupDescribeAcl
+ ApiKeys.CONSUMER_GROUP_DESCRIBE -> groupDescribeAcl,
+ ApiKeys.SHARE_GROUP_HEARTBEAT -> (shareGroupReadAcl ++ topicDescribeAcl),
+ ApiKeys.SHARE_GROUP_DESCRIBE -> (shareGroupDescribeAcl ++
topicDescribeAcl),
+ ApiKeys.SHARE_FETCH -> (shareGroupReadAcl ++ topicReadAcl),
+ ApiKeys.SHARE_ACKNOWLEDGE -> (shareGroupReadAcl ++ topicReadAcl),
+ ApiKeys.INITIALIZE_SHARE_GROUP_STATE -> clusterAcl,
+ ApiKeys.READ_SHARE_GROUP_STATE -> clusterAcl,
+ ApiKeys.WRITE_SHARE_GROUP_STATE -> clusterAcl,
+ ApiKeys.DELETE_SHARE_GROUP_STATE -> clusterAcl,
+ ApiKeys.READ_SHARE_GROUP_STATE_SUMMARY -> clusterAcl,
+ ApiKeys.DESCRIBE_SHARE_GROUP_OFFSETS -> (shareGroupDescribeAcl ++
topicDescribeAcl),
+ ApiKeys.DELETE_SHARE_GROUP_OFFSETS -> (shareGroupDeleteAcl ++ topicReadAcl)
)
private def createMetadataRequest(allowAutoTopicCreation: Boolean) = {
@@ -655,6 +690,120 @@ class AuthorizerIntegrationTest extends
AbstractAuthorizerIntegrationTest {
.setGroupIds(List(group).asJava)
.setIncludeAuthorizedOperations(false)).build()
+ private def shareGroupHeartbeatRequest = new
ShareGroupHeartbeatRequest.Builder(
+ new ShareGroupHeartbeatRequestData()
+ .setGroupId(shareGroup)
+ .setMemberEpoch(0)
+
.setSubscribedTopicNames(List(topic).asJava)).build(ApiKeys.SHARE_GROUP_HEARTBEAT.latestVersion)
+
+
+ private def shareGroupDescribeRequest = new
ShareGroupDescribeRequest.Builder(
+ new ShareGroupDescribeRequestData()
+ .setGroupIds(List(shareGroup).asJava)
+
.setIncludeAuthorizedOperations(false)).build(ApiKeys.SHARE_GROUP_DESCRIBE.latestVersion)
+
+
+ private def createShareFetchRequest = {
+ val metadata: ShareRequestMetadata = new
ShareRequestMetadata(Uuid.randomUuid(), ShareRequestMetadata.INITIAL_EPOCH)
+ val send: Seq[TopicIdPartition] = Seq(
+ new TopicIdPartition(getTopicIds().getOrElse(tp.topic, Uuid.ZERO_UUID),
new TopicPartition(topic, part)))
+ val ackMap = new util.HashMap[TopicIdPartition,
util.List[ShareFetchRequestData.AcknowledgementBatch]]
+ requests.ShareFetchRequest.Builder.forConsumer(shareGroup, metadata, 100,
0, Int.MaxValue, 500, 500,
+ send.asJava, Seq.empty.asJava, ackMap).build()
+ }
+
+ private def shareAcknowledgeRequest = {
+ val shareAcknowledgeRequestData = new ShareAcknowledgeRequestData()
+ .setGroupId(shareGroup)
+ .setMemberId(Uuid.randomUuid().toString)
+ .setShareSessionEpoch(1)
+ .setTopics(List(new ShareAcknowledgeRequestData.AcknowledgeTopic()
+ .setTopicId(getTopicIds().getOrElse(tp.topic, Uuid.ZERO_UUID))
+ .setPartitions(List(
+ new ShareAcknowledgeRequestData.AcknowledgePartition()
+ .setPartitionIndex(part)
+ .setAcknowledgementBatches(List(
+ new ShareAcknowledgeRequestData.AcknowledgementBatch()
+ .setFirstOffset(0)
+ .setLastOffset(1)
+ .setAcknowledgeTypes(Collections.singletonList(1.toByte))
+ ).asJava)
+ ).asJava)
+ ).asJava)
+
+ new
ShareAcknowledgeRequest.Builder(shareAcknowledgeRequestData).build(ApiKeys.SHARE_ACKNOWLEDGE.latestVersion)
+ }
+
+ private def initializeShareGroupStateRequest = new
InitializeShareGroupStateRequest.Builder(
+ new InitializeShareGroupStateRequestData()
+ .setGroupId(shareGroup)
+ .setTopics(List(new
InitializeShareGroupStateRequestData.InitializeStateData()
+ .setTopicId(getTopicIds().getOrElse(tp.topic, Uuid.ZERO_UUID))
+ .setPartitions(List(new
InitializeShareGroupStateRequestData.PartitionData()
+ .setPartition(part)
+ ).asJava)
+ ).asJava)).build()
+
+ private def readShareGroupStateRequest = new
ReadShareGroupStateRequest.Builder(
+ new ReadShareGroupStateRequestData()
+ .setGroupId(shareGroup)
+ .setTopics(List(new ReadShareGroupStateRequestData.ReadStateData()
+ .setTopicId(getTopicIds().getOrElse(tp.topic, Uuid.ZERO_UUID))
+ .setPartitions(List(new ReadShareGroupStateRequestData.PartitionData()
+ .setPartition(part)
+ .setLeaderEpoch(0)
+ ).asJava)
+ ).asJava)).build()
+
+ private def writeShareGroupStateRequest = new
WriteShareGroupStateRequest.Builder(
+ new WriteShareGroupStateRequestData()
+ .setGroupId(shareGroup)
+ .setTopics(List(new WriteShareGroupStateRequestData.WriteStateData()
+ .setTopicId(getTopicIds().getOrElse(tp.topic, Uuid.ZERO_UUID))
+ .setPartitions(List(new WriteShareGroupStateRequestData.PartitionData()
+ .setPartition(part)
+ ).asJava)
+ ).asJava)).build()
+
+ private def deleteShareGroupStateRequest = new
DeleteShareGroupStateRequest.Builder(
+ new DeleteShareGroupStateRequestData()
+ .setGroupId(shareGroup)
+ .setTopics(List(new DeleteShareGroupStateRequestData.DeleteStateData()
+ .setTopicId(getTopicIds().getOrElse(tp.topic, Uuid.ZERO_UUID))
+ .setPartitions(List(new
DeleteShareGroupStateRequestData.PartitionData()
+ .setPartition(part)
+ ).asJava)
+ ).asJava)).build()
+
+ private def readShareGroupStateSummaryRequest = new
ReadShareGroupStateSummaryRequest.Builder(
+ new ReadShareGroupStateSummaryRequestData()
+ .setGroupId(shareGroup)
+ .setTopics(List(new
ReadShareGroupStateSummaryRequestData.ReadStateSummaryData()
+ .setTopicId(getTopicIds().getOrElse(tp.topic, Uuid.ZERO_UUID))
+ .setPartitions(List(new
ReadShareGroupStateSummaryRequestData.PartitionData()
+ .setPartition(part)
+ .setLeaderEpoch(0)
+ ).asJava)
+ ).asJava)).build(ApiKeys.READ_SHARE_GROUP_STATE_SUMMARY.latestVersion)
+
+ private def describeShareGroupOffsetsRequest = new
DescribeShareGroupOffsetsRequest.Builder(
+ new DescribeShareGroupOffsetsRequestData()
+ .setGroups(List(new
DescribeShareGroupOffsetsRequestData.DescribeShareGroupOffsetsRequestGroup()
+ .setGroupId(shareGroup)
+ .setTopics(List(new
DescribeShareGroupOffsetsRequestData.DescribeShareGroupOffsetsRequestTopic()
+ .setTopicName(topic)
+ .setPartitions(List(Integer.valueOf(part)
+ ).asJava)
+ ).asJava)
+ ).asJava)).build(ApiKeys.DESCRIBE_SHARE_GROUP_OFFSETS.latestVersion)
+
+ private def deleteShareGroupOffsetsRequest = new
DeleteShareGroupOffsetsRequest.Builder(
+ new DeleteShareGroupOffsetsRequestData()
+ .setGroupId(shareGroup)
+ .setTopics(List(new
DeleteShareGroupOffsetsRequestData.DeleteShareGroupOffsetsRequestTopic()
+ .setTopicName(topic)
+ ).asJava)).build(ApiKeys.DELETE_SHARE_GROUP_OFFSETS.latestVersion)
+
private def sendRequests(requestKeyToRequest: mutable.Map[ApiKeys,
AbstractRequest], topicExists: Boolean = true,
topicNames: Map[Uuid, String] = getTopicNames()) = {
for ((key, request) <- requestKeyToRequest) {
@@ -669,6 +818,8 @@ class AuthorizerIntegrationTest extends
AbstractAuthorizerIntegrationTest {
// In KRaft mode, trying to delete a topic that doesn't exist but
that you do have
// describe permission for will give UNKNOWN_TOPIC_OR_PARTITION.
true
+ } else if (resourceToAcls.size > 1) {
+ false
} else {
describeAcls == acls
}
@@ -684,7 +835,7 @@ class AuthorizerIntegrationTest extends
AbstractAuthorizerIntegrationTest {
}
@ParameterizedTest
- @ValueSource(strings = Array("kraft"))
+ @ValueSource(strings = Array("kip932"))
def testAuthorizationWithTopicExisting(quorum: String): Unit = {
//First create the topic so we have a valid topic ID
sendRequests(mutable.Map(ApiKeys.CREATE_TOPICS -> createTopicsRequest))
@@ -723,6 +874,18 @@ class AuthorizerIntegrationTest extends
AbstractAuthorizerIntegrationTest {
ApiKeys.WRITE_TXN_MARKERS -> writeTxnMarkersRequest,
ApiKeys.CONSUMER_GROUP_HEARTBEAT -> consumerGroupHeartbeatRequest,
ApiKeys.CONSUMER_GROUP_DESCRIBE -> consumerGroupDescribeRequest,
+ ApiKeys.SHARE_GROUP_HEARTBEAT -> shareGroupHeartbeatRequest,
+ ApiKeys.SHARE_GROUP_DESCRIBE -> shareGroupDescribeRequest,
+ ApiKeys.SHARE_FETCH -> createShareFetchRequest,
+ ApiKeys.SHARE_ACKNOWLEDGE -> shareAcknowledgeRequest,
+ ApiKeys.INITIALIZE_SHARE_GROUP_STATE -> initializeShareGroupStateRequest,
+ ApiKeys.READ_SHARE_GROUP_STATE -> readShareGroupStateRequest,
+ ApiKeys.WRITE_SHARE_GROUP_STATE -> writeShareGroupStateRequest,
+ ApiKeys.DELETE_SHARE_GROUP_STATE -> deleteShareGroupStateRequest,
+ ApiKeys.READ_SHARE_GROUP_STATE_SUMMARY ->
readShareGroupStateSummaryRequest,
+ ApiKeys.DESCRIBE_SHARE_GROUP_OFFSETS -> describeShareGroupOffsetsRequest,
+ ApiKeys.DELETE_SHARE_GROUP_OFFSETS -> deleteShareGroupOffsetsRequest,
+
// Delete the topic last
ApiKeys.DELETE_TOPICS -> deleteTopicsRequest
)
@@ -752,7 +915,10 @@ class AuthorizerIntegrationTest extends
AbstractAuthorizerIntegrationTest {
ApiKeys.CREATE_PARTITIONS -> createPartitionsRequest,
ApiKeys.DELETE_GROUPS -> deleteGroupsRequest,
ApiKeys.OFFSET_FOR_LEADER_EPOCH -> offsetsForLeaderEpochRequest,
- ApiKeys.ELECT_LEADERS -> electLeadersRequest
+ ApiKeys.ELECT_LEADERS -> electLeadersRequest,
+ ApiKeys.SHARE_FETCH -> createShareFetchRequest,
+ ApiKeys.SHARE_ACKNOWLEDGE -> shareAcknowledgeRequest,
+ ApiKeys.DESCRIBE_SHARE_GROUP_OFFSETS -> describeShareGroupOffsetsRequest
)
sendRequests(requestKeyToRequest, false, topicNames)
@@ -2653,6 +2819,476 @@ class AuthorizerIntegrationTest extends
AbstractAuthorizerIntegrationTest {
sendAndReceiveRegexHeartbeat(member1Response, interBrokerListenerName,
Some(0), fullRequest = true)
}
+ @ParameterizedTest
+ @ValueSource(strings = Array("kip932"))
+ def testShareGroupHeartbeatWithGroupReadAndTopicDescribeAcl(quorum: String):
Unit = {
+ addAndVerifyAcls(shareGroupReadAcl(shareGroupResource), shareGroupResource)
+ addAndVerifyAcls(topicDescribeAcl(topicResource), topicResource)
+
+ val request = shareGroupHeartbeatRequest
+ val resource = Set[ResourceType](GROUP, TOPIC)
+ sendRequestAndVerifyResponseError(request, resource, isAuthorized = true)
+ }
+
+ @ParameterizedTest
+ @ValueSource(strings = Array("kip932"))
+ def testShareGroupHeartbeatWithOperationAll(quorum: String): Unit = {
+ val allowAllOpsAcl = new AccessControlEntry(clientPrincipalString,
WILDCARD_HOST, ALL, ALLOW)
+ addAndVerifyAcls(Set(allowAllOpsAcl), shareGroupResource)
+ addAndVerifyAcls(Set(allowAllOpsAcl), topicResource)
+
+ val request = shareGroupHeartbeatRequest
+ val resource = Set[ResourceType](GROUP, TOPIC)
+ sendRequestAndVerifyResponseError(request, resource, isAuthorized = true)
+ }
+
+ @ParameterizedTest
+ @ValueSource(strings = Array("kip932"))
+ def testShareGroupHeartbeatWithoutGroupReadOrTopicDescribeAcl(quorum:
String): Unit = {
+ removeAllClientAcls()
+
+ val request = shareGroupHeartbeatRequest
+ val resource = Set[ResourceType](GROUP, TOPIC)
+ sendRequestAndVerifyResponseError(request, resource, isAuthorized = false)
+ }
+
+ @ParameterizedTest
+ @ValueSource(strings = Array("kip932"))
+ def testShareGroupHeartbeatWithoutGroupReadAcl(quorum: String): Unit = {
+ addAndVerifyAcls(topicDescribeAcl(topicResource), topicResource)
+
+ val request = shareGroupHeartbeatRequest
+ val resource = Set[ResourceType](GROUP, TOPIC)
+ sendRequestAndVerifyResponseError(request, resource, isAuthorized = false)
+ }
+
+ @ParameterizedTest
+ @ValueSource(strings = Array("kip932"))
+ def testShareGroupHeartbeatWithoutTopicDescribeAcl(quorum: String): Unit = {
+ addAndVerifyAcls(shareGroupReadAcl(shareGroupResource), shareGroupResource)
+
+ val request = shareGroupHeartbeatRequest
+ val resource = Set[ResourceType](GROUP, TOPIC)
+ sendRequestAndVerifyResponseError(request, resource, isAuthorized = false)
+ }
+
+ private def createShareGroupToDescribe(): Unit = {
+ createTopicWithBrokerPrincipal(topic)
+ addAndVerifyAcls(Set(new AccessControlEntry(clientPrincipalString,
WILDCARD_HOST, READ, ALLOW)), shareGroupResource)
+ addAndVerifyAcls(Set(new AccessControlEntry(clientPrincipalString,
WILDCARD_HOST, READ, ALLOW)), topicResource)
+ shareConsumerConfig.put(ConsumerConfig.GROUP_ID_CONFIG, shareGroup)
+ val consumer = createShareConsumer()
+ consumer.subscribe(Collections.singleton(topic))
+ consumer.poll(Duration.ofMillis(500L))
+ removeAllClientAcls()
+ }
+
+ @ParameterizedTest
+ @ValueSource(strings = Array("kip932"))
+ def testShareGroupDescribeWithGroupDescribeAndTopicDescribeAcl(quorum:
String): Unit = {
+ createShareGroupToDescribe()
+ addAndVerifyAcls(shareGroupDescribeAcl(shareGroupResource),
shareGroupResource)
+ addAndVerifyAcls(topicDescribeAcl(topicResource), topicResource)
+
+ val request = shareGroupDescribeRequest
+ val resource = Set[ResourceType](GROUP, TOPIC)
+ sendRequestAndVerifyResponseError(request, resource, isAuthorized = true)
+ }
+
+ @ParameterizedTest
+ @ValueSource(strings = Array("kip932"))
+ def testShareGroupDescribeWithOperationAll(quorum: String): Unit = {
+ createShareGroupToDescribe()
+
+ val allowAllOpsAcl = new AccessControlEntry(clientPrincipalString,
WILDCARD_HOST, ALL, ALLOW)
+ addAndVerifyAcls(Set(allowAllOpsAcl), shareGroupResource)
+ addAndVerifyAcls(Set(allowAllOpsAcl), topicResource)
+
+ val request = shareGroupDescribeRequest
+ val resource = Set[ResourceType](GROUP, TOPIC)
+ sendRequestAndVerifyResponseError(request, resource, isAuthorized = true)
+ }
+
+ @ParameterizedTest
+ @ValueSource(strings = Array("kip932"))
+ def testShareGroupDescribeWithoutGroupDescribeAcl(quorum: String): Unit = {
+ createShareGroupToDescribe()
+ addAndVerifyAcls(topicDescribeAcl(topicResource), topicResource)
+
+ val request = shareGroupDescribeRequest
+ val resource = Set[ResourceType](GROUP, TOPIC)
+ sendRequestAndVerifyResponseError(request, resource, isAuthorized = false)
+ }
+
+ @ParameterizedTest
+ @ValueSource(strings = Array("kip932"))
+ def testShareGroupDescribeWithoutGroupDescribeOrTopicDescribeAcl(quorum:
String): Unit = {
+ createShareGroupToDescribe()
+
+ val request = shareGroupDescribeRequest
+ val resource = Set[ResourceType](GROUP, TOPIC)
+ sendRequestAndVerifyResponseError(request, resource, isAuthorized = false)
+ }
+
+ @ParameterizedTest
+ @ValueSource(strings = Array("kip932"))
+ def testShareFetchWithGroupReadAndTopicReadAcl(quorum: String): Unit = {
+ addAndVerifyAcls(shareGroupReadAcl(shareGroupResource), shareGroupResource)
+ addAndVerifyAcls(topicReadAcl(topicResource), topicResource)
+
+ val request = createShareFetchRequest
+ val resource = Set[ResourceType](GROUP, TOPIC)
+ sendRequestAndVerifyResponseError(request, resource, isAuthorized = true)
+ }
+
+ @ParameterizedTest
+ @ValueSource(strings = Array("kip932"))
+ def testShareFetchWithOperationAll(quorum: String): Unit = {
+ val allowAllOpsAcl = new AccessControlEntry(clientPrincipalString,
WILDCARD_HOST, ALL, ALLOW)
+ addAndVerifyAcls(Set(allowAllOpsAcl), shareGroupResource)
+ addAndVerifyAcls(Set(allowAllOpsAcl), topicResource)
+
+ val request = createShareFetchRequest
+ val resource = Set[ResourceType](GROUP, TOPIC)
+ sendRequestAndVerifyResponseError(request, resource, isAuthorized = true)
+ }
+
+ @ParameterizedTest
+ @ValueSource(strings = Array("kip932"))
+ def testShareFetchWithoutGroupReadOrTopicReadAcl(quorum: String): Unit = {
+ removeAllClientAcls()
+
+ val request = createShareFetchRequest
+ val resource = Set[ResourceType](GROUP, TOPIC)
+ sendRequestAndVerifyResponseError(request, resource, isAuthorized = false)
+ }
+
+ @ParameterizedTest
+ @ValueSource(strings = Array("kip932"))
+ def testShareFetchWithoutGroupReadAcl(quorum: String): Unit = {
+ addAndVerifyAcls(topicReadAcl(topicResource), topicResource)
+
+ val request = createShareFetchRequest
+ val resource = Set[ResourceType](GROUP, TOPIC)
+ sendRequestAndVerifyResponseError(request, resource, isAuthorized = false)
+ }
+
+ @ParameterizedTest
+ @ValueSource(strings = Array("kip932"))
+ def testShareFetchWithoutTopicReadAcl(quorum: String): Unit = {
+ createTopicWithBrokerPrincipal(topic)
+ addAndVerifyAcls(shareGroupReadAcl(shareGroupResource), shareGroupResource)
+
+ val request = createShareFetchRequest
+ val response = connectAndReceive[ShareFetchResponse](request, listenerName
= listenerName)
+ assertEquals(Errors.TOPIC_AUTHORIZATION_FAILED,
Errors.forCode(response.data.responses.get(0).partitions.get(0).errorCode))
+ }
+
+ @ParameterizedTest
+ @ValueSource(strings = Array("kip932"))
+ def testShareAcknowledgeWithGroupReadAndTopicReadAcl(quorum: String): Unit =
{
+ addAndVerifyAcls(shareGroupReadAcl(shareGroupResource), shareGroupResource)
+ addAndVerifyAcls(topicReadAcl(topicResource), topicResource)
+
+ val request = shareAcknowledgeRequest
+ val resource = Set[ResourceType](GROUP, TOPIC)
+ sendRequestAndVerifyResponseError(request, resource, isAuthorized = true)
+ }
+
+ @ParameterizedTest
+ @ValueSource(strings = Array("kip932"))
+ def testShareAcknowledgeWithOperationAll(quorum: String): Unit = {
+ val allowAllOpsAcl = new AccessControlEntry(clientPrincipalString,
WILDCARD_HOST, ALL, ALLOW)
+ addAndVerifyAcls(Set(allowAllOpsAcl), shareGroupResource)
+ addAndVerifyAcls(Set(allowAllOpsAcl), topicResource)
+
+ val request = shareAcknowledgeRequest
+ val resource = Set[ResourceType](GROUP, TOPIC)
+ sendRequestAndVerifyResponseError(request, resource, isAuthorized = true)
+ }
+
+ @ParameterizedTest
+ @ValueSource(strings = Array("kip932"))
+ def testShareAcknowledgeWithoutGroupReadOrTopicReadAcl(quorum: String): Unit
= {
+ removeAllClientAcls()
+
+ val request = shareAcknowledgeRequest
+ val resource = Set[ResourceType](GROUP, TOPIC)
+ sendRequestAndVerifyResponseError(request, resource, isAuthorized = false)
+ }
+
+ @ParameterizedTest
+ @ValueSource(strings = Array("kip932"))
+ def testShareAcknowledgeFetchWithoutGroupReadAcl(quorum: String): Unit = {
+ addAndVerifyAcls(topicReadAcl(topicResource), topicResource)
+
+ val request = shareAcknowledgeRequest
+ val resource = Set[ResourceType](GROUP, TOPIC)
+ sendRequestAndVerifyResponseError(request, resource, isAuthorized = false)
+ }
+
+ @ParameterizedTest
+ @ValueSource(strings = Array("kip932"))
+ def testInitializeShareGroupStateWithClusterAcl(quorum: String): Unit = {
+ addAndVerifyAcls(clusterAcl(clusterResource), clusterResource)
+
+ val request = initializeShareGroupStateRequest
+ val resource = Set[ResourceType](CLUSTER)
+ sendRequestAndVerifyResponseError(request, resource, isAuthorized = true)
+ }
+
+ @ParameterizedTest
+ @ValueSource(strings = Array("kip932"))
+ def testInitializeShareGroupStateWithOperationAll(quorum: String): Unit = {
+ val allowAllOpsAcl = new AccessControlEntry(clientPrincipalString,
WILDCARD_HOST, ALL, ALLOW)
+ addAndVerifyAcls(Set(allowAllOpsAcl), clusterResource)
+
+ val request = initializeShareGroupStateRequest
+ val resource = Set[ResourceType](CLUSTER)
+ sendRequestAndVerifyResponseError(request, resource, isAuthorized = true)
+ }
+
+ @ParameterizedTest
+ @ValueSource(strings = Array("kip932"))
+ def testInitializeShareGroupStateWithoutClusterAcl(quorum: String): Unit = {
+ removeAllClientAcls()
+
+ val request = initializeShareGroupStateRequest
+ val resource = Set[ResourceType](CLUSTER)
+ sendRequestAndVerifyResponseError(request, resource, isAuthorized = false)
+ }
+
+ @ParameterizedTest
+ @ValueSource(strings = Array("kip932"))
+ def testReadShareGroupStateWithClusterAcl(quorum: String): Unit = {
+ addAndVerifyAcls(clusterAcl(clusterResource), clusterResource)
+
+ val request = readShareGroupStateRequest
+ val resource = Set[ResourceType](CLUSTER)
+ sendRequestAndVerifyResponseError(request, resource, isAuthorized = true)
+ }
+
+ @ParameterizedTest
+ @ValueSource(strings = Array("kip932"))
+ def testReadShareGroupStateWithOperationAll(quorum: String): Unit = {
+ val allowAllOpsAcl = new AccessControlEntry(clientPrincipalString,
WILDCARD_HOST, ALL, ALLOW)
+ addAndVerifyAcls(Set(allowAllOpsAcl), clusterResource)
+
+ val request = readShareGroupStateRequest
+ val resource = Set[ResourceType](CLUSTER)
+ sendRequestAndVerifyResponseError(request, resource, isAuthorized = true)
+ }
+
+ @ParameterizedTest
+ @ValueSource(strings = Array("kip932"))
+ def testReadShareGroupStateWithoutClusterAcl(quorum: String): Unit = {
+ removeAllClientAcls()
+
+ val request = readShareGroupStateRequest
+ val resource = Set[ResourceType](CLUSTER)
+ sendRequestAndVerifyResponseError(request, resource, isAuthorized = false)
+ }
+
+ @ParameterizedTest
+ @ValueSource(strings = Array("kip932"))
+ def testWriteShareGroupStateWithClusterAcl(quorum: String): Unit = {
+ addAndVerifyAcls(clusterAcl(clusterResource), clusterResource)
+
+ val request = writeShareGroupStateRequest
+ val resource = Set[ResourceType](CLUSTER)
+ sendRequestAndVerifyResponseError(request, resource, isAuthorized = true)
+ }
+
+ @ParameterizedTest
+ @ValueSource(strings = Array("kip932"))
+ def testWriteShareGroupStateWithOperationAll(quorum: String): Unit = {
+ val allowAllOpsAcl = new AccessControlEntry(clientPrincipalString,
WILDCARD_HOST, ALL, ALLOW)
+ addAndVerifyAcls(Set(allowAllOpsAcl), clusterResource)
+
+ val request = writeShareGroupStateRequest
+ val resource = Set[ResourceType](CLUSTER)
+ sendRequestAndVerifyResponseError(request, resource, isAuthorized = true)
+ }
+
+ @ParameterizedTest
+ @ValueSource(strings = Array("kip932"))
+ def testWriteShareGroupStateWithoutClusterAcl(quorum: String): Unit = {
+ removeAllClientAcls()
+
+ val request = writeShareGroupStateRequest
+ val resource = Set[ResourceType](CLUSTER)
+ sendRequestAndVerifyResponseError(request, resource, isAuthorized = false)
+ }
+
+ @ParameterizedTest
+ @ValueSource(strings = Array("kip932"))
+ def testDeleteShareGroupStateWithClusterAcl(quorum: String): Unit = {
+ addAndVerifyAcls(clusterAcl(clusterResource), clusterResource)
+
+ val request = deleteShareGroupStateRequest
+ val resource = Set[ResourceType](CLUSTER)
+ sendRequestAndVerifyResponseError(request, resource, isAuthorized = true)
+ }
+
+ @ParameterizedTest
+ @ValueSource(strings = Array("kip932"))
+ def testDeleteShareGroupStateWithOperationAll(quorum: String): Unit = {
+ val allowAllOpsAcl = new AccessControlEntry(clientPrincipalString,
WILDCARD_HOST, ALL, ALLOW)
+ addAndVerifyAcls(Set(allowAllOpsAcl), clusterResource)
+
+ val request = deleteShareGroupStateRequest
+ val resource = Set[ResourceType](CLUSTER)
+ sendRequestAndVerifyResponseError(request, resource, isAuthorized = true)
+ }
+
+ @ParameterizedTest
+ @ValueSource(strings = Array("kip932"))
+ def testDeleteShareGroupStateWithoutClusterAcl(quorum: String): Unit = {
+ removeAllClientAcls()
+
+ val request = deleteShareGroupStateRequest
+ val resource = Set[ResourceType](CLUSTER)
+ sendRequestAndVerifyResponseError(request, resource, isAuthorized = false)
+ }
+
+ @ParameterizedTest
+ @ValueSource(strings = Array("kip932"))
+ def testReadShareGroupStateSummaryWithClusterAcl(quorum: String): Unit = {
+ addAndVerifyAcls(clusterAcl(clusterResource), clusterResource)
+
+ val request = readShareGroupStateSummaryRequest
+ val resource = Set[ResourceType](CLUSTER)
+ sendRequestAndVerifyResponseError(request, resource, isAuthorized = true)
+ }
+
+ @ParameterizedTest
+ @ValueSource(strings = Array("kip932"))
+ def testReadShareGroupStateSummaryWithOperationAll(quorum: String): Unit = {
+ val allowAllOpsAcl = new AccessControlEntry(clientPrincipalString,
WILDCARD_HOST, ALL, ALLOW)
+ addAndVerifyAcls(Set(allowAllOpsAcl), clusterResource)
+
+ val request = readShareGroupStateRequest
+ val resource = Set[ResourceType](CLUSTER)
+ sendRequestAndVerifyResponseError(request, resource, isAuthorized = true)
+ }
+
+ @ParameterizedTest
+ @ValueSource(strings = Array("kip932"))
+ def testReadShareGroupStateSummaryWithoutClusterAcl(quorum: String): Unit = {
+ removeAllClientAcls()
+
+ val request = readShareGroupStateRequest
+ val resource = Set[ResourceType](CLUSTER)
+ sendRequestAndVerifyResponseError(request, resource, isAuthorized = false)
+ }
+
+ @ParameterizedTest
+ @ValueSource(strings = Array("kip932"))
+ def
testDescribeShareGroupOffsetsWithGroupDescribeAndTopicDescribeAcl(quorum:
String): Unit = {
+ addAndVerifyAcls(shareGroupDescribeAcl(shareGroupResource),
shareGroupResource)
+ addAndVerifyAcls(topicDescribeAcl(topicResource), topicResource)
+
+ val request = describeShareGroupOffsetsRequest
+ val resource = Set[ResourceType](GROUP, TOPIC)
+ sendRequestAndVerifyResponseError(request, resource, isAuthorized = true)
+ }
+
+ @ParameterizedTest
+ @ValueSource(strings = Array("kip932"))
+ def testDescribeShareGroupOffsetsWithOperationAll(quorum: String): Unit = {
+ val allowAllOpsAcl = new AccessControlEntry(clientPrincipalString,
WILDCARD_HOST, ALL, ALLOW)
+ addAndVerifyAcls(Set(allowAllOpsAcl), shareGroupResource)
+ addAndVerifyAcls(Set(allowAllOpsAcl), topicResource)
+
+ val request = describeShareGroupOffsetsRequest
+ val resource = Set[ResourceType](GROUP, TOPIC)
+ sendRequestAndVerifyResponseError(request, resource, isAuthorized = true)
+ }
+
+ @ParameterizedTest
+ @ValueSource(strings = Array("kip932"))
+ def
testDescribeShareGroupOffsetsWithoutGroupDescribeOrTopicDescribeAcl(quorum:
String): Unit = {
+ removeAllClientAcls()
+
+ val request = describeShareGroupOffsetsRequest
+ val resource = Set[ResourceType](GROUP, TOPIC)
+ sendRequestAndVerifyResponseError(request, resource, isAuthorized = false)
+ }
+
+ @ParameterizedTest
+ @ValueSource(strings = Array("kip932"))
+ def testDescribeShareGroupOffsetsWithoutGroupDescribeAcl(quorum: String):
Unit = {
+ addAndVerifyAcls(topicDescribeAcl(topicResource), topicResource)
+
+ val request = describeShareGroupOffsetsRequest
+ val resource = Set[ResourceType](GROUP, TOPIC)
+ sendRequestAndVerifyResponseError(request, resource, isAuthorized = false)
+ }
+
+ @ParameterizedTest
+ @ValueSource(strings = Array("kip932"))
+ def testDescribeShareGroupOffsetsWithoutTopicDescribeAcl(quorum: String):
Unit = {
+ addAndVerifyAcls(shareGroupDescribeAcl(shareGroupResource),
shareGroupResource)
+
+ val request = describeShareGroupOffsetsRequest
+ val response =
connectAndReceive[DescribeShareGroupOffsetsResponse](request, listenerName =
listenerName)
+ assertEquals(Errors.TOPIC_AUTHORIZATION_FAILED,
Errors.forCode(response.data.groups.get(0).topics.get(0).partitions.get(0).errorCode))
+ }
+
+ @ParameterizedTest
+ @ValueSource(strings = Array("kip932"))
+ def testDeleteShareGroupOffsetsWithGroupDeleteAndTopicReadAcl(quorum:
String): Unit = {
+ addAndVerifyAcls(shareGroupDeleteAcl(shareGroupResource),
shareGroupResource)
+ addAndVerifyAcls(topicReadAcl(topicResource), topicResource)
+
+ val request = deleteShareGroupOffsetsRequest
+ val resource = Set[ResourceType](GROUP, TOPIC)
+ sendRequestAndVerifyResponseError(request, resource, isAuthorized = true)
+ }
+
+ @ParameterizedTest
+ @ValueSource(strings = Array("kip932"))
+ def testDeleteShareGroupOffsetsWithOperationAll(quorum: String): Unit = {
+ val allowAllOpsAcl = new AccessControlEntry(clientPrincipalString,
WILDCARD_HOST, ALL, ALLOW)
+ addAndVerifyAcls(Set(allowAllOpsAcl), shareGroupResource)
+ addAndVerifyAcls(Set(allowAllOpsAcl), topicResource)
+
+ val request = deleteShareGroupOffsetsRequest
+ val resource = Set[ResourceType](GROUP, TOPIC)
+ sendRequestAndVerifyResponseError(request, resource, isAuthorized = true)
+ }
+
+ @ParameterizedTest
+ @ValueSource(strings = Array("kip932"))
+ def testDeleteShareGroupOffsetsWithoutGroupDeleteOrTopicReadAcl(quorum:
String): Unit = {
+ removeAllClientAcls()
+
+ val request = deleteShareGroupOffsetsRequest
+ val resource = Set[ResourceType](GROUP, TOPIC)
+ sendRequestAndVerifyResponseError(request, resource, isAuthorized = false)
+ }
+
+ @ParameterizedTest
+ @ValueSource(strings = Array("kip932"))
+ def testDeleteShareGroupOffsetsWithoutGroupDeleteAcl(quorum: String): Unit =
{
+ addAndVerifyAcls(topicReadAcl(topicResource), topicResource)
+
+ val request = deleteShareGroupOffsetsRequest
+ val resource = Set[ResourceType](GROUP, TOPIC)
+ sendRequestAndVerifyResponseError(request, resource, isAuthorized = false)
+ }
+
+ @ParameterizedTest
+ @ValueSource(strings = Array("kip932"))
+ def testDeleteShareGroupOffsetsWithoutTopicReadAcl(quorum: String): Unit = {
+ addAndVerifyAcls(shareGroupDeleteAcl(shareGroupResource),
shareGroupResource)
+
+ val request = deleteShareGroupOffsetsRequest
+ val response = connectAndReceive[DeleteShareGroupOffsetsResponse](request,
listenerName = listenerName)
+ assertEquals(1, response.data.responses.size)
+ assertEquals(Errors.TOPIC_AUTHORIZATION_FAILED.code,
response.data.responses.get(0).errorCode, s"Unexpected response $response")
+ }
+
private def sendAndReceiveFirstRegexHeartbeat(memberId: String,
listenerName: ListenerName):
ConsumerGroupHeartbeatResponseData = {
val request = new ConsumerGroupHeartbeatRequest.Builder(