This is an automated email from the ASF dual-hosted git repository.
lucasbru pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new e73719d9624 KAFKA-18819 StreamsGroupHeartbeat API and
StreamsGroupDescribe API check topic describe (#19183)
e73719d9624 is described below
commit e73719d9624efa041498776a6bc0158cb82ac3d9
Author: Lan Ding <[email protected]>
AuthorDate: Thu Mar 20 03:42:05 2025 +0800
KAFKA-18819 StreamsGroupHeartbeat API and StreamsGroupDescribe API check
topic describe (#19183)
This patch filters out the topic describe unauthorized topics from the
StreamsGroupHeartbeat and StreamsGroupDescribe response.
Reviewers: Lucas Brutschy <[email protected]>
---
.../internals/DescribeStreamsGroupsHandler.java | 1 +
.../requests/StreamsGroupDescribeResponse.java | 1 +
.../message/StreamsGroupDescribeResponse.json | 1 +
.../message/StreamsGroupHeartbeatResponse.json | 2 +-
core/src/main/scala/kafka/server/KafkaApis.scala | 55 ++++++-
.../scala/unit/kafka/server/KafkaApisTest.scala | 181 ++++++++++++++++++++-
6 files changed, 232 insertions(+), 9 deletions(-)
diff --git
a/clients/src/main/java/org/apache/kafka/clients/admin/internals/DescribeStreamsGroupsHandler.java
b/clients/src/main/java/org/apache/kafka/clients/admin/internals/DescribeStreamsGroupsHandler.java
index 8bf793bd31c..8355a78b9d4 100644
---
a/clients/src/main/java/org/apache/kafka/clients/admin/internals/DescribeStreamsGroupsHandler.java
+++
b/clients/src/main/java/org/apache/kafka/clients/admin/internals/DescribeStreamsGroupsHandler.java
@@ -238,6 +238,7 @@ public class DescribeStreamsGroupsHandler extends
AdminApiHandler.Batched<Coordi
Set<CoordinatorKey> groupsToUnmap) {
switch (error) {
case GROUP_AUTHORIZATION_FAILED:
+ case TOPIC_AUTHORIZATION_FAILED:
log.debug("`DescribeStreamsGroups` request for group id {}
failed due to error {}", groupId.idValue, error);
failed.put(groupId, error.exception(errorMsg));
break;
diff --git
a/clients/src/main/java/org/apache/kafka/common/requests/StreamsGroupDescribeResponse.java
b/clients/src/main/java/org/apache/kafka/common/requests/StreamsGroupDescribeResponse.java
index 83db6700a4a..0439b955325 100644
---
a/clients/src/main/java/org/apache/kafka/common/requests/StreamsGroupDescribeResponse.java
+++
b/clients/src/main/java/org/apache/kafka/common/requests/StreamsGroupDescribeResponse.java
@@ -35,6 +35,7 @@ import java.util.Map;
* - {@link Errors#INVALID_REQUEST}
* - {@link Errors#INVALID_GROUP_ID}
* - {@link Errors#GROUP_ID_NOT_FOUND}
+ * - {@link Errors#TOPIC_AUTHORIZATION_FAILED}
*/
public class StreamsGroupDescribeResponse extends AbstractResponse {
diff --git
a/clients/src/main/resources/common/message/StreamsGroupDescribeResponse.json
b/clients/src/main/resources/common/message/StreamsGroupDescribeResponse.json
index 9cf2954c17f..5dff3d7bf44 100644
---
a/clients/src/main/resources/common/message/StreamsGroupDescribeResponse.json
+++
b/clients/src/main/resources/common/message/StreamsGroupDescribeResponse.json
@@ -27,6 +27,7 @@
// - INVALID_REQUEST (version 0+)
// - INVALID_GROUP_ID (version 0+)
// - GROUP_ID_NOT_FOUND (version 0+)
+ // - TOPIC_AUTHORIZATION_FAILED (version 0+)
"fields": [
{ "name": "ThrottleTimeMs", "type": "int32", "versions": "0+",
"about": "The duration in milliseconds for which the request was
throttled due to a quota violation, or zero if the request did not violate any
quota." },
diff --git
a/clients/src/main/resources/common/message/StreamsGroupHeartbeatResponse.json
b/clients/src/main/resources/common/message/StreamsGroupHeartbeatResponse.json
index 43b5268e205..a5f3a99f9de 100644
---
a/clients/src/main/resources/common/message/StreamsGroupHeartbeatResponse.json
+++
b/clients/src/main/resources/common/message/StreamsGroupHeartbeatResponse.json
@@ -30,7 +30,7 @@
// - FENCED_MEMBER_EPOCH (version 0+)
// - UNRELEASED_INSTANCE_ID (version 0+)
// - GROUP_MAX_SIZE_REACHED (version 0+)
- // - TOPIC_AUTHORIZATION_FAILED (version 0+)
+ // - TOPIC_AUTHORIZATION_FAILED (version 0+)
// - CLUSTER_AUTHORIZATION_FAILED (version 0+)
// - STREAMS_INVALID_TOPOLOGY (version 0+)
// - STREAMS_INVALID_TOPOLOGY_EPOCH (version 0+)
diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala
b/core/src/main/scala/kafka/server/KafkaApis.scala
index adf5c5a6e53..e2d8e17f950 100644
--- a/core/src/main/scala/kafka/server/KafkaApis.scala
+++ b/core/src/main/scala/kafka/server/KafkaApis.scala
@@ -2707,11 +2707,20 @@ class KafkaApis(val requestChannel: RequestChannel,
requestHelper.sendMaybeThrottle(request, new
StreamsGroupHeartbeatResponse(errorResponse))
return CompletableFuture.completedFuture[Unit](())
}
+
+ if (requiredTopics.nonEmpty) {
+ val authorizedTopics =
authHelper.filterByAuthorized(request.context, DESCRIBE, TOPIC,
requiredTopics)(identity)
+ if (authorizedTopics.size < requiredTopics.size) {
+ val responseData = new
StreamsGroupHeartbeatResponseData().setErrorCode(Errors.TOPIC_AUTHORIZATION_FAILED.code)
+ requestHelper.sendMaybeThrottle(request, new
StreamsGroupHeartbeatResponse(responseData))
+ return CompletableFuture.completedFuture[Unit](())
+ }
+ }
}
groupCoordinator.streamsGroupHeartbeat(
request.context,
- streamsGroupHeartbeatRequest.data,
+ streamsGroupHeartbeatRequest.data
).handle[Unit] { (response, exception) =>
if (exception != null) {
requestHelper.sendMaybeThrottle(request,
streamsGroupHeartbeatRequest.getErrorResponse(exception))
@@ -2795,6 +2804,50 @@ class KafkaApis(val requestChannel: RequestChannel,
response.groups.addAll(results)
}
+ // Clients are not allowed to see topics that are not authorized for
Describe.
+ if (authorizer.isDefined) {
+ val topicsToCheck = response.groups.stream()
+ .filter(group => group.topology != null)
+ .flatMap(group => group.topology.subtopologies.stream)
+ .flatMap(subtopology => java.util.stream.Stream.concat(
+ java.util.stream.Stream.concat(
+ java.util.stream.Stream.concat(
+ subtopology.sourceTopics.stream,
+ subtopology.repartitionSinkTopics.stream),
+ subtopology.repartitionSourceTopics.stream.map(_.name)),
+ subtopology.stateChangelogTopics.stream.map(_.name)))
+ .collect(Collectors.toSet[String])
+ .asScala
+
+ val authorizedTopics =
authHelper.filterByAuthorized(request.context, DESCRIBE, TOPIC,
+ topicsToCheck)(identity)
+
+ val updatedGroups = response.groups.stream.map { group =>
+ val hasUnauthorizedTopic = if (group.topology == null) false
else
+ group.topology.subtopologies.stream()
+ .flatMap(subtopology => java.util.stream.Stream.concat(
+ java.util.stream.Stream.concat(
+ java.util.stream.Stream.concat(
+ subtopology.sourceTopics.stream,
+ subtopology.repartitionSinkTopics.stream),
+
subtopology.repartitionSourceTopics.stream.map(_.name)),
+ subtopology.stateChangelogTopics.stream.map(_.name)))
+ .anyMatch(topic => !authorizedTopics.contains(topic))
+
+ if (hasUnauthorizedTopic) {
+ new StreamsGroupDescribeResponseData.DescribedGroup()
+ .setGroupId(group.groupId)
+ .setErrorCode(Errors.TOPIC_AUTHORIZATION_FAILED.code)
+ .setErrorMessage("The described group uses topics that the
client is not authorized to describe.")
+ .setMembers(List.empty.asJava)
+ .setTopology(null)
+ } else {
+ group
+ }
+
}.collect(Collectors.toList[StreamsGroupDescribeResponseData.DescribedGroup])
+ response.setGroups(updatedGroups)
+ }
+
requestHelper.sendMaybeThrottle(request, new
StreamsGroupDescribeResponse(response))
}
}
diff --git a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
index 70a64f47160..9c382d709c4 100644
--- a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
+++ b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
@@ -9883,7 +9883,7 @@ class KafkaApisTest extends Logging {
}
@Test
- def testStreamsGroupHeartbeatRequestAuthorizationFailed(): Unit = {
+ def testStreamsGroupHeartbeatRequestGroupAuthorizationFailed(): Unit = {
metadataCache = mock(classOf[KRaftMetadataCache])
val streamsGroupHeartbeatRequest = new
StreamsGroupHeartbeatRequestData().setGroupId("group")
@@ -9903,6 +9903,58 @@ class KafkaApisTest extends Logging {
assertEquals(Errors.GROUP_AUTHORIZATION_FAILED.code,
response.data.errorCode)
}
+ @Test
+ def testStreamsGroupHeartbeatRequestTopicAuthorizationFailed(): Unit = {
+ metadataCache = mock(classOf[KRaftMetadataCache])
+ val groupId = "group"
+ val fooTopicName = "foo"
+ val barTopicName = "bar"
+ val zarTopicName = "zar"
+ val tarTopicName = "tar"
+
+ val streamsGroupHeartbeatRequest = new
StreamsGroupHeartbeatRequestData().setGroupId(groupId).setTopology(
+ new StreamsGroupHeartbeatRequestData.Topology()
+ .setEpoch(3)
+ .setSubtopologies(
+ util.List.of(new
StreamsGroupHeartbeatRequestData.Subtopology().setSubtopologyId("subtopology")
+ .setSourceTopics(Collections.singletonList(fooTopicName))
+ .setRepartitionSinkTopics(Collections.singletonList(barTopicName))
+ .setRepartitionSourceTopics(Collections.singletonList(new
StreamsGroupHeartbeatRequestData.TopicInfo().setName(zarTopicName)))
+ .setStateChangelogTopics(Collections.singletonList(new
StreamsGroupHeartbeatRequestData.TopicInfo().setName(tarTopicName)))
+ )
+ )
+ )
+
+ val requestChannelRequest = buildRequest(new
StreamsGroupHeartbeatRequest.Builder(streamsGroupHeartbeatRequest,
true).build())
+
+ val authorizer: Authorizer = mock(classOf[Authorizer])
+ val acls = Map(
+ groupId -> AuthorizationResult.ALLOWED,
+ fooTopicName -> AuthorizationResult.ALLOWED,
+ barTopicName -> AuthorizationResult.DENIED,
+ zarTopicName -> AuthorizationResult.ALLOWED,
+ tarTopicName -> AuthorizationResult.ALLOWED
+ )
+ when(authorizer.authorize(
+ any[RequestContext],
+ any[util.List[Action]]
+ )).thenAnswer { invocation =>
+ val actions = invocation.getArgument(1, classOf[util.List[Action]])
+ actions.asScala.map { action =>
+ acls.getOrElse(action.resourcePattern.name, AuthorizationResult.DENIED)
+ }.asJava
+ }
+
+ kafkaApis = createKafkaApis(
+ authorizer = Some(authorizer),
+ overrideProperties =
Map(GroupCoordinatorConfig.GROUP_COORDINATOR_REBALANCE_PROTOCOLS_CONFIG ->
"classic,streams")
+ )
+ kafkaApis.handle(requestChannelRequest, RequestLocal.noCaching)
+
+ val response =
verifyNoThrottling[StreamsGroupHeartbeatResponse](requestChannelRequest)
+ assertEquals(Errors.TOPIC_AUTHORIZATION_FAILED.code,
response.data.errorCode)
+ }
+
@Test
def testStreamsGroupHeartbeatRequestProtocolDisabled(): Unit = {
metadataCache = mock(classOf[KRaftMetadataCache])
@@ -10230,6 +10282,8 @@ class KafkaApisTest extends Logging {
@ValueSource(booleans = Array(true, false))
def testStreamsGroupDescribe(includeAuthorizedOperations: Boolean): Unit = {
metadataCache = mock(classOf[KRaftMetadataCache])
+ val fooTopicName = "foo"
+ val barTopicName = "bar"
val groupIds = List("group-id-0", "group-id-1", "group-id-2").asJava
val streamsGroupDescribeRequestData = new StreamsGroupDescribeRequestData()
@@ -10247,10 +10301,32 @@ class KafkaApisTest extends Logging {
)
kafkaApis.handle(requestChannelRequest, RequestLocal.noCaching)
+ val subtopology0 = new StreamsGroupDescribeResponseData.Subtopology()
+ .setSubtopologyId("subtopology0")
+ .setSourceTopics(Collections.singletonList(fooTopicName))
+
+ val subtopology1 = new StreamsGroupDescribeResponseData.Subtopology()
+ .setSubtopologyId("subtopology1")
+ .setRepartitionSinkTopics(Collections.singletonList(barTopicName))
+
+ val subtopology2 = new StreamsGroupDescribeResponseData.Subtopology()
+ .setSubtopologyId("subtopology2")
+ .setSourceTopics(Collections.singletonList(fooTopicName))
+ .setRepartitionSinkTopics(Collections.singletonList(barTopicName))
+
future.complete(List(
- new
StreamsGroupDescribeResponseData.DescribedGroup().setGroupId(groupIds.get(0)),
- new
StreamsGroupDescribeResponseData.DescribedGroup().setGroupId(groupIds.get(1)),
- new
StreamsGroupDescribeResponseData.DescribedGroup().setGroupId(groupIds.get(2))
+ new StreamsGroupDescribeResponseData.DescribedGroup()
+ .setGroupId(groupIds.get(0))
+ .setTopology(new StreamsGroupDescribeResponseData.Topology()
+ .setSubtopologies(Collections.singletonList(subtopology0))),
+ new StreamsGroupDescribeResponseData.DescribedGroup()
+ .setGroupId(groupIds.get(1))
+ .setTopology(new StreamsGroupDescribeResponseData.Topology()
+ .setSubtopologies(Collections.singletonList(subtopology1))),
+ new StreamsGroupDescribeResponseData.DescribedGroup()
+ .setGroupId(groupIds.get(2))
+ .setTopology(new StreamsGroupDescribeResponseData.Topology()
+ .setSubtopologies(Collections.singletonList(subtopology2)))
).asJava)
var authorizedOperationsInt = Int.MinValue;
@@ -10262,9 +10338,18 @@ class KafkaApisTest extends Logging {
// Can't reuse the above list here because we would not test the
implementation in KafkaApis then
val describedGroups = List(
- new
StreamsGroupDescribeResponseData.DescribedGroup().setGroupId(groupIds.get(0)),
- new
StreamsGroupDescribeResponseData.DescribedGroup().setGroupId(groupIds.get(1)),
- new
StreamsGroupDescribeResponseData.DescribedGroup().setGroupId(groupIds.get(2))
+ new StreamsGroupDescribeResponseData.DescribedGroup()
+ .setGroupId(groupIds.get(0))
+ .setTopology(new StreamsGroupDescribeResponseData.Topology()
+ .setSubtopologies(Collections.singletonList(subtopology0))),
+ new StreamsGroupDescribeResponseData.DescribedGroup()
+ .setGroupId(groupIds.get(1))
+ .setTopology(new StreamsGroupDescribeResponseData.Topology()
+ .setSubtopologies(Collections.singletonList(subtopology1))),
+ new StreamsGroupDescribeResponseData.DescribedGroup()
+ .setGroupId(groupIds.get(2))
+ .setTopology(new StreamsGroupDescribeResponseData.Topology()
+ .setSubtopologies(Collections.singletonList(subtopology2)))
).map(group => group.setAuthorizedOperations(authorizedOperationsInt))
val expectedStreamsGroupDescribeResponseData = new
StreamsGroupDescribeResponseData()
.setGroups(describedGroups.asJava)
@@ -10353,6 +10438,88 @@ class KafkaApisTest extends Logging {
assertEquals(Errors.FENCED_MEMBER_EPOCH.code,
response.data.groups.get(0).errorCode)
}
+ @ParameterizedTest
+ @ValueSource(booleans = Array(true, false))
+ def
testStreamsGroupDescribeFilterUnauthorizedTopics(includeAuthorizedOperations:
Boolean): Unit = {
+ val fooTopicName = "foo"
+ val barTopicName = "bar"
+ val errorMessage = "The described group uses topics that the client is not
authorized to describe."
+
+ metadataCache = mock(classOf[KRaftMetadataCache])
+
+ val groupIds = List("group-id-0", "group-id-1", "group-id-2").asJava
+ val streamsGroupDescribeRequestData = new StreamsGroupDescribeRequestData()
+ .setIncludeAuthorizedOperations(includeAuthorizedOperations)
+ streamsGroupDescribeRequestData.groupIds.addAll(groupIds)
+ val requestChannelRequest = buildRequest(new
StreamsGroupDescribeRequest.Builder(streamsGroupDescribeRequestData,
true).build())
+
+ val authorizer: Authorizer = mock(classOf[Authorizer])
+ val acls = Map(
+ groupIds.get(0) -> AuthorizationResult.ALLOWED,
+ groupIds.get(1) -> AuthorizationResult.ALLOWED,
+ groupIds.get(2) -> AuthorizationResult.ALLOWED,
+ fooTopicName -> AuthorizationResult.ALLOWED,
+ barTopicName -> AuthorizationResult.DENIED,
+ )
+ when(authorizer.authorize(
+ any[RequestContext],
+ any[util.List[Action]]
+ )).thenAnswer { invocation =>
+ val actions = invocation.getArgument(1, classOf[util.List[Action]])
+ actions.asScala.map { action =>
+ acls.getOrElse(action.resourcePattern.name, AuthorizationResult.DENIED)
+ }.asJava
+ }
+
+ val future = new
CompletableFuture[util.List[StreamsGroupDescribeResponseData.DescribedGroup]]()
+ when(groupCoordinator.streamsGroupDescribe(
+ any[RequestContext],
+ any[util.List[String]]
+ )).thenReturn(future)
+ kafkaApis = createKafkaApis(
+ authorizer = Some(authorizer),
+ overrideProperties =
Map(GroupCoordinatorConfig.GROUP_COORDINATOR_REBALANCE_PROTOCOLS_CONFIG ->
"classic,streams")
+ )
+ kafkaApis.handle(requestChannelRequest, RequestLocal.noCaching)
+
+ val subtopology0 = new StreamsGroupDescribeResponseData.Subtopology()
+ .setSubtopologyId("subtopology0")
+ .setSourceTopics(Collections.singletonList(fooTopicName))
+
+ val subtopology1 = new StreamsGroupDescribeResponseData.Subtopology()
+ .setSubtopologyId("subtopology1")
+ .setRepartitionSinkTopics(Collections.singletonList(barTopicName))
+
+ val subtopology2 = new StreamsGroupDescribeResponseData.Subtopology()
+ .setSubtopologyId("subtopology2")
+ .setSourceTopics(Collections.singletonList(fooTopicName))
+ .setRepartitionSinkTopics(Collections.singletonList(barTopicName))
+
+ future.complete(List(
+ new StreamsGroupDescribeResponseData.DescribedGroup()
+ .setGroupId(groupIds.get(0))
+ .setTopology(new StreamsGroupDescribeResponseData.Topology()
+ .setSubtopologies(Collections.singletonList(subtopology0))),
+ new StreamsGroupDescribeResponseData.DescribedGroup()
+ .setGroupId(groupIds.get(1))
+ .setTopology(new StreamsGroupDescribeResponseData.Topology()
+ .setSubtopologies(Collections.singletonList(subtopology1))),
+ new StreamsGroupDescribeResponseData.DescribedGroup()
+ .setGroupId(groupIds.get(2))
+ .setTopology(new StreamsGroupDescribeResponseData.Topology()
+ .setSubtopologies(Collections.singletonList(subtopology2)))
+ ).asJava)
+
+ val response =
verifyNoThrottling[StreamsGroupDescribeResponse](requestChannelRequest)
+ assertNotNull(response.data)
+ assertEquals(3, response.data.groups.size)
+ assertEquals(Errors.NONE.code(), response.data.groups.get(0).errorCode())
+ assertEquals(Errors.TOPIC_AUTHORIZATION_FAILED.code(),
response.data.groups.get(1).errorCode())
+ assertEquals(Errors.TOPIC_AUTHORIZATION_FAILED.code(),
response.data.groups.get(2).errorCode())
+ assertEquals(errorMessage, response.data.groups.get(1).errorMessage())
+ assertEquals(errorMessage, response.data.groups.get(2).errorMessage())
+ }
+
@Test
def testConsumerGroupDescribeFilterUnauthorizedTopics(): Unit = {
val fooTopicName = "foo"