This is an automated email from the ASF dual-hosted git repository.

dajac pushed a commit to branch 3.8
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/3.8 by this push:
     new db3bf4ae3d6 KAFKA-14509; [4/4] Handle includeAuthorizedOperations in 
ConsumerGroupDescribe API (#16158)
db3bf4ae3d6 is described below

commit db3bf4ae3d617f17b6eb0913eb9f98e340dfb68a
Author: Max Riedel <29626305+riedel...@users.noreply.github.com>
AuthorDate: Mon Jun 10 14:07:45 2024 +0200

    KAFKA-14509; [4/4] Handle includeAuthorizedOperations in 
ConsumerGroupDescribe API (#16158)
    
    This patch implements the handling of `includeAuthorizedOperations` flag in 
the ConsumerGroupDescribe API.
    
    Reviewers: David Jacot <dja...@confluent.io>
---
 core/src/main/scala/kafka/server/KafkaApis.scala   | 12 ++++++++++
 .../server/ConsumerGroupDescribeRequestsTest.scala | 15 +++++++++---
 .../server/GroupCoordinatorBaseRequestTest.scala   |  5 +++-
 .../scala/unit/kafka/server/KafkaApisTest.scala    | 28 +++++++++++++++++-----
 4 files changed, 50 insertions(+), 10 deletions(-)

diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala 
b/core/src/main/scala/kafka/server/KafkaApis.scala
index 8c9aada8998..268205dde6f 100644
--- a/core/src/main/scala/kafka/server/KafkaApis.scala
+++ b/core/src/main/scala/kafka/server/KafkaApis.scala
@@ -3831,6 +3831,7 @@ class KafkaApis(val requestChannel: RequestChannel,
 
   def handleConsumerGroupDescribe(request: RequestChannel.Request): 
CompletableFuture[Unit] = {
     val consumerGroupDescribeRequest = 
request.body[ConsumerGroupDescribeRequest]
+    val includeAuthorizedOperations = 
consumerGroupDescribeRequest.data.includeAuthorizedOperations
 
     if (!isConsumerGroupProtocolEnabled()) {
       // The API is not supported by the "old" group coordinator (the 
default). If the
@@ -3859,6 +3860,17 @@ class KafkaApis(val requestChannel: RequestChannel,
         if (exception != null) {
           requestHelper.sendMaybeThrottle(request, 
consumerGroupDescribeRequest.getErrorResponse(exception))
         } else {
+          if (includeAuthorizedOperations) {
+            results.forEach { groupResult =>
+              if (groupResult.errorCode == Errors.NONE.code) {
+                
groupResult.setAuthorizedOperations(authHelper.authorizedOperations(
+                  request,
+                  new Resource(ResourceType.GROUP, groupResult.groupId)
+                ))
+              }
+            }
+          }
+
           if (response.groups.isEmpty) {
             // If the response is empty, we can directly reuse the results.
             response.setGroups(results)
diff --git 
a/core/src/test/scala/unit/kafka/server/ConsumerGroupDescribeRequestsTest.scala 
b/core/src/test/scala/unit/kafka/server/ConsumerGroupDescribeRequestsTest.scala
index b1f8b8405e7..0e745f33d5b 100644
--- 
a/core/src/test/scala/unit/kafka/server/ConsumerGroupDescribeRequestsTest.scala
+++ 
b/core/src/test/scala/unit/kafka/server/ConsumerGroupDescribeRequestsTest.scala
@@ -16,9 +16,8 @@
  */
 package kafka.server
 
-import kafka.server.GroupCoordinatorBaseRequestTest
 import kafka.test.ClusterInstance
-import kafka.test.annotation.{ClusterConfigProperty, ClusterFeature, 
ClusterTest, ClusterTestDefaults, Type}
+import kafka.test.annotation._
 import kafka.test.junit.ClusterTestExtensions
 import kafka.utils.TestUtils
 import org.apache.kafka.common.ConsumerGroupState
@@ -26,11 +25,15 @@ import 
org.apache.kafka.common.message.ConsumerGroupDescribeResponseData.{Assign
 import org.apache.kafka.common.message.{ConsumerGroupDescribeRequestData, 
ConsumerGroupDescribeResponseData, ConsumerGroupHeartbeatResponseData}
 import org.apache.kafka.common.protocol.{ApiKeys, Errors}
 import org.apache.kafka.common.requests.{ConsumerGroupDescribeRequest, 
ConsumerGroupDescribeResponse}
+import org.apache.kafka.common.resource.ResourceType
+import org.apache.kafka.common.utils.Utils
+import org.apache.kafka.security.authorizer.AclEntry
 import org.apache.kafka.server.common.Features
 import org.junit.jupiter.api.Assertions.assertEquals
 import org.junit.jupiter.api.extension.ExtendWith
 import org.junit.jupiter.api.{Tag, Timeout}
 
+import java.lang.{Byte => JByte}
 import scala.jdk.CollectionConverters._
 
 @Timeout(120)
@@ -116,6 +119,9 @@ class ConsumerGroupDescribeRequestsTest(cluster: 
ClusterInstance) extends GroupC
     val timeoutMs = 5 * 60 * 1000
     val clientId = "client-id"
     val clientHost = "/127.0.0.1"
+    val authorizedOperationsInt = Utils.to32BitField(
+      AclEntry.supportedOperations(ResourceType.GROUP).asScala
+        .map(_.code.asInstanceOf[JByte]).asJava)
 
     // Add first group with one member.
     var grp1Member1Response: ConsumerGroupHeartbeatResponseData = null
@@ -162,6 +168,7 @@ class ConsumerGroupDescribeRequestsTest(cluster: 
ClusterInstance) extends GroupC
           .setGroupEpoch(1)
           .setAssignmentEpoch(1)
           .setAssignorName("uniform")
+          .setAuthorizedOperations(authorizedOperationsInt)
           .setMembers(List(
             new ConsumerGroupDescribeResponseData.Member()
               .setMemberId(grp1Member1Response.memberId)
@@ -177,6 +184,7 @@ class ConsumerGroupDescribeRequestsTest(cluster: 
ClusterInstance) extends GroupC
           .setGroupEpoch(grp2Member2Response.memberEpoch)
           .setAssignmentEpoch(grp2Member2Response.memberEpoch)
           .setAssignorName("range")
+          .setAuthorizedOperations(authorizedOperationsInt)
           .setMembers(List(
             new ConsumerGroupDescribeResponseData.Member()
               .setMemberId(grp2Member2Response.memberId)
@@ -219,7 +227,8 @@ class ConsumerGroupDescribeRequestsTest(cluster: 
ClusterInstance) extends GroupC
 
       val actual = consumerGroupDescribe(
         groupIds = List("grp-1", "grp-2"),
-        version = version.toShort
+        includeAuthorizedOperations = true,
+        version = version.toShort,
       )
 
       assertEquals(expected, actual)
diff --git 
a/core/src/test/scala/unit/kafka/server/GroupCoordinatorBaseRequestTest.scala 
b/core/src/test/scala/unit/kafka/server/GroupCoordinatorBaseRequestTest.scala
index 847bdf3225f..9fad21476e7 100644
--- 
a/core/src/test/scala/unit/kafka/server/GroupCoordinatorBaseRequestTest.scala
+++ 
b/core/src/test/scala/unit/kafka/server/GroupCoordinatorBaseRequestTest.scala
@@ -421,10 +421,13 @@ class GroupCoordinatorBaseRequestTest(cluster: 
ClusterInstance) {
 
   protected def consumerGroupDescribe(
     groupIds: List[String],
+    includeAuthorizedOperations: Boolean,
     version: Short = 
ApiKeys.CONSUMER_GROUP_DESCRIBE.latestVersion(isUnstableApiEnabled)
   ): List[ConsumerGroupDescribeResponseData.DescribedGroup] = {
     val consumerGroupDescribeRequest = new 
ConsumerGroupDescribeRequest.Builder(
-      new ConsumerGroupDescribeRequestData().setGroupIds(groupIds.asJava)
+      new ConsumerGroupDescribeRequestData()
+        .setGroupIds(groupIds.asJava)
+        .setIncludeAuthorizedOperations(includeAuthorizedOperations)
     ).build(version)
 
     val consumerGroupDescribeResponse = 
connectAndReceive[ConsumerGroupDescribeResponse](consumerGroupDescribeRequest)
diff --git a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala 
b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
index bd0ded29deb..2113a46124c 100644
--- a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
+++ b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
@@ -76,6 +76,7 @@ import org.apache.kafka.common.utils.{ProducerIdAndEpoch, 
SecurityUtils, Utils}
 import org.apache.kafka.coordinator.group.{GroupCoordinator, 
GroupCoordinatorConfig}
 import org.apache.kafka.coordinator.transaction.TransactionLogConfigs
 import org.apache.kafka.raft.QuorumConfig
+import org.apache.kafka.security.authorizer.AclEntry
 import org.apache.kafka.server.ClientMetricsManager
 import org.apache.kafka.server.authorizer.{Action, AuthorizationResult, 
Authorizer}
 import org.apache.kafka.server.common.MetadataVersion.{IBP_0_10_2_IV0, 
IBP_2_2_IV1}
@@ -92,6 +93,7 @@ import org.mockito.ArgumentMatchers._
 import org.mockito.Mockito._
 import org.mockito.{ArgumentCaptor, ArgumentMatchers, Mockito}
 
+import java.lang.{Byte => JByte}
 import java.net.InetAddress
 import java.nio.charset.StandardCharsets
 import java.time.Duration
@@ -7115,8 +7117,9 @@ class KafkaApisTest extends Logging {
     assertEquals(Errors.GROUP_AUTHORIZATION_FAILED.code, 
response.data.errorCode)
   }
 
-  @Test
-  def testConsumerGroupDescribe(): Unit = {
+  @ParameterizedTest
+  @ValueSource(booleans = Array(true, false))
+  def testConsumerGroupDescribe(includeAuthorizedOperations: Boolean): Unit = {
     metadataCache = mock(classOf[KRaftMetadataCache])
     when(metadataCache.features()).thenReturn {
       new FinalizedFeatures(
@@ -7129,6 +7132,7 @@ class KafkaApisTest extends Logging {
 
     val groupIds = List("group-id-0", "group-id-1", "group-id-2").asJava
     val consumerGroupDescribeRequestData = new 
ConsumerGroupDescribeRequestData()
+      .setIncludeAuthorizedOperations(includeAuthorizedOperations)
     consumerGroupDescribeRequestData.groupIds.addAll(groupIds)
     val requestChannelRequest = buildRequest(new 
ConsumerGroupDescribeRequest.Builder(consumerGroupDescribeRequestData, 
true).build())
 
@@ -7143,15 +7147,27 @@ class KafkaApisTest extends Logging {
     )
     kafkaApis.handle(requestChannelRequest, RequestLocal.NoCaching)
 
-    val describedGroups = List(
+    future.complete(List(
       new DescribedGroup().setGroupId(groupIds.get(0)),
       new DescribedGroup().setGroupId(groupIds.get(1)),
       new DescribedGroup().setGroupId(groupIds.get(2))
-    ).asJava
+    ).asJava)
 
-    future.complete(describedGroups)
+    var authorizedOperationsInt = Int.MinValue;
+    if (includeAuthorizedOperations) {
+      authorizedOperationsInt = Utils.to32BitField(
+        AclEntry.supportedOperations(ResourceType.GROUP).asScala
+          .map(_.code.asInstanceOf[JByte]).asJava)
+    }
+
+    // Can't reuse the above list here because we would not test the 
implementation in KafkaApis then
+    val describedGroups = List(
+      new DescribedGroup().setGroupId(groupIds.get(0)),
+      new DescribedGroup().setGroupId(groupIds.get(1)),
+      new DescribedGroup().setGroupId(groupIds.get(2))
+    ).map(group => group.setAuthorizedOperations(authorizedOperationsInt))
     val expectedConsumerGroupDescribeResponseData = new 
ConsumerGroupDescribeResponseData()
-      .setGroups(describedGroups)
+      .setGroups(describedGroups.asJava)
 
     val response = 
verifyNoThrottling[ConsumerGroupDescribeResponse](requestChannelRequest)
 

Reply via email to