This is an automated email from the ASF dual-hosted git repository.

chia7712 pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new fe88232b07c KAFKA-17750 Extend kafka-consumer-groups command line tool 
to support new consumer group (part 1) (#17958)
fe88232b07c is described below

commit fe88232b07cd1954793cc64eea0ec735257fb622
Author: PoAn Yang <[email protected]>
AuthorDate: Wed Dec 4 06:08:39 2024 +0800

    KAFKA-17750 Extend kafka-consumer-groups command line tool to support new 
consumer group (part 1) (#17958)
    
    1) Bump validVersions of ConsumerGroupDescribeRequest.json and 
ConsumerGroupDescribeResponse.json to "0-1".
    
    2) Add MemberType field to ConsumerGroupDescribeResponse.json. Default 
value is -1 (unknown). 0 is for classic member and 1 is for consumer member.
    
    3) When ConsumerGroupMember#useClassicProtocol is true, return MemberType 
field as 0. Otherwise, return 1.
    
    Reviewers: David Jacot <[email protected]>, Chia-Ping Tsai 
<[email protected]>
---
 .../message/ConsumerGroupDescribeRequest.json      |   4 +-
 .../message/ConsumerGroupDescribeResponse.json     |   7 +-
 .../server/ConsumerGroupDescribeRequestTest.scala  | 118 ++++++++++++++++++++-
 .../group/modern/consumer/ConsumerGroupMember.java |   3 +-
 .../modern/consumer/ConsumerGroupMemberTest.java   |  15 ++-
 .../group/modern/consumer/ConsumerGroupTest.java   |   4 +-
 6 files changed, 138 insertions(+), 13 deletions(-)

diff --git 
a/clients/src/main/resources/common/message/ConsumerGroupDescribeRequest.json 
b/clients/src/main/resources/common/message/ConsumerGroupDescribeRequest.json
index a581d15dee3..ff404a0966f 100644
--- 
a/clients/src/main/resources/common/message/ConsumerGroupDescribeRequest.json
+++ 
b/clients/src/main/resources/common/message/ConsumerGroupDescribeRequest.json
@@ -18,7 +18,9 @@
   "type": "request",
   "listeners": ["broker"],
   "name": "ConsumerGroupDescribeRequest",
-  "validVersions": "0",
+  // Version 1 adds MemberType field to ConsumerGroupDescribeResponse 
(KIP-1099).
+  // For ConsumerGroupDescribeRequest, version 1 is same as version 0.
+  "validVersions": "0-1",
   "flexibleVersions": "0+",
   "fields": [
     { "name": "GroupIds", "type": "[]string", "versions": "0+", "entityType": 
"groupId",
diff --git 
a/clients/src/main/resources/common/message/ConsumerGroupDescribeResponse.json 
b/clients/src/main/resources/common/message/ConsumerGroupDescribeResponse.json
index 3c6ed4e78de..14d80e20ce2 100644
--- 
a/clients/src/main/resources/common/message/ConsumerGroupDescribeResponse.json
+++ 
b/clients/src/main/resources/common/message/ConsumerGroupDescribeResponse.json
@@ -17,7 +17,8 @@
   "apiKey": 69,
   "type": "response",
   "name": "ConsumerGroupDescribeResponse",
-  "validVersions": "0",
+  // Version 1 adds MemberType field (KIP-1099).
+  "validVersions": "0-1",
   "flexibleVersions": "0+",
   // Supported errors:
   // - GROUP_AUTHORIZATION_FAILED (version 0+)
@@ -69,7 +70,9 @@
             { "name": "Assignment", "type": "Assignment", "versions": "0+",
               "about": "The current assignment." },
             { "name": "TargetAssignment", "type": "Assignment", "versions": 
"0+",
-              "about": "The target assignment." }
+              "about": "The target assignment." },
+            { "name": "MemberType", "type": "int8", "versions": "1+", 
"default": "-1", "ignorable": true,
+              "about": "-1 for unknown. 0 for classic member. +1 for consumer 
member." }
           ]},
         { "name": "AuthorizedOperations", "type": "int32", "versions": "0+", 
"default": "-2147483648",
           "about": "32-bit bitfield to represent authorized operations for 
this group." }
diff --git 
a/core/src/test/scala/unit/kafka/server/ConsumerGroupDescribeRequestTest.scala 
b/core/src/test/scala/unit/kafka/server/ConsumerGroupDescribeRequestTest.scala
index ad98d2d875e..47e287a7166 100644
--- 
a/core/src/test/scala/unit/kafka/server/ConsumerGroupDescribeRequestTest.scala
+++ 
b/core/src/test/scala/unit/kafka/server/ConsumerGroupDescribeRequestTest.scala
@@ -20,7 +20,9 @@ import org.apache.kafka.common.test.api.ClusterInstance
 import org.apache.kafka.common.test.api._
 import org.apache.kafka.common.test.api.ClusterTestExtensions
 import kafka.utils.TestUtils
-import org.apache.kafka.common.{ConsumerGroupState, Uuid}
+import org.apache.kafka.clients.consumer.ConsumerPartitionAssignor
+import org.apache.kafka.clients.consumer.internals.ConsumerProtocol
+import org.apache.kafka.common.{ConsumerGroupState, TopicPartition, Uuid}
 import 
org.apache.kafka.common.message.ConsumerGroupDescribeResponseData.{Assignment, 
DescribedGroup, TopicPartitions}
 import org.apache.kafka.common.message.{ConsumerGroupDescribeRequestData, 
ConsumerGroupDescribeResponseData, ConsumerGroupHeartbeatResponseData}
 import org.apache.kafka.common.protocol.{ApiKeys, Errors}
@@ -30,10 +32,11 @@ import org.apache.kafka.common.utils.Utils
 import org.apache.kafka.coordinator.group.GroupCoordinatorConfig
 import org.apache.kafka.security.authorizer.AclEntry
 import org.apache.kafka.server.common.Features
-import org.junit.jupiter.api.Assertions.assertEquals
+import org.junit.jupiter.api.Assertions.{assertEquals, assertFalse}
 import org.junit.jupiter.api.extension.ExtendWith
 
 import java.lang.{Byte => JByte}
+import java.util.Collections
 import scala.jdk.CollectionConverters._
 
 @ExtendWith(value = Array(classOf[ClusterTestExtensions]))
@@ -153,6 +156,7 @@ class ConsumerGroupDescribeRequestTest(cluster: 
ClusterInstance) extends GroupCo
                 .setClientHost(clientHost)
                 .setSubscribedTopicRegex("")
                 .setSubscribedTopicNames(List("bar").asJava)
+                .setMemberType(if (version == 0) -1.toByte else 1.toByte)
             ).asJava),
           new DescribedGroup()
             .setGroupId("grp-2")
@@ -176,7 +180,8 @@ class ConsumerGroupDescribeRequestTest(cluster: 
ClusterInstance) extends GroupCo
                       .setTopicId(topicId)
                       .setTopicName("foo")
                       .setPartitions(List[Integer](2).asJava)
-                  ).asJava)),
+                  ).asJava))
+                .setMemberType(if (version == 0) -1.toByte else 1.toByte),
               new ConsumerGroupDescribeResponseData.Member()
                 .setMemberId(grp2Member1Response.memberId)
                 .setMemberEpoch(grp2Member1Response.memberEpoch)
@@ -197,7 +202,8 @@ class ConsumerGroupDescribeRequestTest(cluster: 
ClusterInstance) extends GroupCo
                       .setTopicId(topicId)
                       .setTopicName("foo")
                       .setPartitions(List[Integer](0, 1).asJava)
-                  ).asJava)),
+                  ).asJava))
+                .setMemberType(if (version == 0) -1.toByte else 1.toByte),
             ).asJava),
         )
 
@@ -213,4 +219,108 @@ class ConsumerGroupDescribeRequestTest(cluster: 
ClusterInstance) extends GroupCo
       admin.close()
     }
   }
+
+  @ClusterTest(
+    types = Array(Type.KRAFT),
+    serverProperties = Array(
+      new ClusterConfigProperty(key = 
GroupCoordinatorConfig.OFFSETS_TOPIC_PARTITIONS_CONFIG, value = "1"),
+      new ClusterConfigProperty(key = 
GroupCoordinatorConfig.OFFSETS_TOPIC_REPLICATION_FACTOR_CONFIG, value = "1")
+    )
+  )
+  def testConsumerGroupDescribeWithMigrationMember(): Unit = {
+    // Creates the __consumer_offsets topics because it won't be created 
automatically
+    // in this test because it does not use FindCoordinator API.
+    createOffsetsTopic()
+
+    // Create the topic.
+    val topicName = "foo"
+    createTopic(
+      topic = topicName,
+      numPartitions = 3
+    )
+
+    val groupId = "grp"
+
+    // Classic member 1 joins the classic group.
+    val memberId1 = joinDynamicConsumerGroupWithOldProtocol(
+      groupId = groupId,
+      metadata = ConsumerProtocol.serializeSubscription(
+        new ConsumerPartitionAssignor.Subscription(
+          Collections.singletonList(topicName),
+          null,
+          List().asJava
+        )
+      ).array,
+      assignment = ConsumerProtocol.serializeAssignment(
+        new ConsumerPartitionAssignor.Assignment(
+          List(0, 1, 2).map(p => new TopicPartition(topicName, p)).asJava
+        )
+      ).array
+    )._1
+
+    // The joining request with a consumer group member 2 is accepted.
+    val memberId2 = consumerGroupHeartbeat(
+      groupId = groupId,
+      memberId = "member-2",
+      rebalanceTimeoutMs = 5 * 60 * 1000,
+      subscribedTopicNames = List(topicName),
+      topicPartitions = List.empty,
+      expectedError = Errors.NONE
+    ).memberId
+
+    for (version <- ApiKeys.CONSUMER_GROUP_DESCRIBE.oldestVersion() to 
ApiKeys.CONSUMER_GROUP_DESCRIBE.latestVersion(isUnstableApiEnabled)) {
+      val actual = consumerGroupDescribe(
+        groupIds = List(groupId),
+        includeAuthorizedOperations = true,
+        version = version.toShort,
+      )
+      assertEquals(1, actual.size)
+      val group = actual.head
+      val member1 = group.members.asScala.find(_.memberId == memberId1)
+      assertFalse(member1.isEmpty)
+      // Version 0 doesn't have memberType field, so memberType field on 
member 1 is -1 (unknown).
+      // After version 1, there is memberType field and it should be +1 
(classic) for member 1.
+      assertEquals(if (version == 0) -1.toByte else 0.toByte, 
member1.get.memberType)
+
+      val member2 = group.members.asScala.find(_.memberId == memberId2)
+      assertFalse(member2.isEmpty)
+      assertEquals(if (version == 0) -1.toByte else 1.toByte, 
member2.get.memberType)
+    }
+
+    // Classic member 1 leaves group.
+    leaveGroup(
+      groupId = groupId,
+      memberId = memberId1,
+      useNewProtocol = false,
+      version = ApiKeys.LEAVE_GROUP.latestVersion(isUnstableApiEnabled)
+    )
+
+    // Member 1 joins as consumer group member.
+    consumerGroupHeartbeat(
+      groupId = groupId,
+      memberId = memberId1,
+      rebalanceTimeoutMs = 5 * 60 * 1000,
+      subscribedTopicNames = List(topicName),
+      topicPartitions = List.empty,
+      expectedError = Errors.NONE
+    )
+
+    // There is no classic member in the group.
+    for (version <- ApiKeys.CONSUMER_GROUP_DESCRIBE.oldestVersion() to 
ApiKeys.CONSUMER_GROUP_DESCRIBE.latestVersion(isUnstableApiEnabled)) {
+      val actual = consumerGroupDescribe(
+        groupIds = List(groupId),
+        includeAuthorizedOperations = true,
+        version = version.toShort,
+      )
+      assertEquals(1, actual.size)
+      val group = actual.head
+      val member1 = group.members.asScala.find(_.memberId == memberId1)
+      assertFalse(member1.isEmpty)
+      assertEquals(if (version == 0) -1.toByte else 1.toByte, 
member1.get.memberType)
+
+      val member2 = group.members.asScala.find(_.memberId == memberId2)
+      assertFalse(member2.isEmpty)
+      assertEquals(if (version == 0) -1.toByte else 1.toByte, 
member2.get.memberType)
+    }
+  }
 }
diff --git 
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/modern/consumer/ConsumerGroupMember.java
 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/modern/consumer/ConsumerGroupMember.java
index 2f04931bda4..69fa5cdd50c 100644
--- 
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/modern/consumer/ConsumerGroupMember.java
+++ 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/modern/consumer/ConsumerGroupMember.java
@@ -408,7 +408,8 @@ public class ConsumerGroupMember extends ModernGroupMember {
             .setInstanceId(instanceId)
             .setRackId(rackId)
             .setSubscribedTopicNames(subscribedTopicNames == null ? null : new 
ArrayList<>(subscribedTopicNames))
-            .setSubscribedTopicRegex(subscribedTopicRegex);
+            .setSubscribedTopicRegex(subscribedTopicRegex)
+            .setMemberType(useClassicProtocol() ? (byte) 0 : (byte) 1);
     }
 
     private static List<ConsumerGroupDescribeResponseData.TopicPartitions> 
topicPartitionsFromMap(
diff --git 
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/modern/consumer/ConsumerGroupMemberTest.java
 
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/modern/consumer/ConsumerGroupMemberTest.java
index 96122e79de3..658e6c23260 100644
--- 
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/modern/consumer/ConsumerGroupMemberTest.java
+++ 
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/modern/consumer/ConsumerGroupMemberTest.java
@@ -26,6 +26,8 @@ import org.apache.kafka.coordinator.group.modern.Assignment;
 import org.apache.kafka.image.MetadataImage;
 
 import org.junit.jupiter.api.Test;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.ValueSource;
 
 import java.util.ArrayList;
 import java.util.Arrays;
@@ -246,8 +248,9 @@ public class ConsumerGroupMemberTest {
         assertEquals(mkAssignment(mkTopicAssignment(topicId2, 3, 4, 5)), 
member.partitionsPendingRevocation());
     }
 
-    @Test
-    public void testAsConsumerGroupDescribeMember() {
+    @ParameterizedTest(name = "{displayName}.withClassicMemberMetadata={0}")
+    @ValueSource(booleans = {true, false})
+    public void testAsConsumerGroupDescribeMember(boolean 
withClassicMemberMetadata) {
         Uuid topicId1 = Uuid.randomUuid();
         Uuid topicId2 = Uuid.randomUuid();
         Uuid topicId3 = Uuid.randomUuid();
@@ -287,6 +290,8 @@ public class ConsumerGroupMemberTest {
             .setClientHost(clientHost)
             .setSubscribedTopicNames(subscribedTopicNames)
             .setSubscribedTopicRegex(subscribedTopicRegex)
+            .setClassicMemberMetadata(withClassicMemberMetadata ? new 
ConsumerGroupMemberMetadataValue.ClassicMemberMetadata()
+                .setSupportedProtocols(toClassicProtocolCollection("range")) : 
null)
             .build();
 
         ConsumerGroupDescribeResponseData.Member actual = 
member.asConsumerGroupDescribeMember(targetAssignment, metadataImage.topics());
@@ -315,7 +320,8 @@ public class ConsumerGroupMemberTest {
                             .setTopicName("topic4")
                             .setPartitions(new ArrayList<>(item.getValue()))
                     ).collect(Collectors.toList()))
-            );
+            )
+            .setMemberType(withClassicMemberMetadata ? (byte) 0 : (byte) 1);
 
         assertEquals(expected, actual);
     }
@@ -344,7 +350,8 @@ public class ConsumerGroupMemberTest {
 
         ConsumerGroupDescribeResponseData.Member expected = new 
ConsumerGroupDescribeResponseData.Member()
             .setMemberId(memberId.toString())
-            .setSubscribedTopicRegex("");
+            .setSubscribedTopicRegex("")
+            .setMemberType((byte) 1);
         ConsumerGroupDescribeResponseData.Member actual = 
member.asConsumerGroupDescribeMember(null,
             new MetadataImageBuilder()
                 .addTopic(Uuid.randomUuid(), "foo", 3)
diff --git 
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/modern/consumer/ConsumerGroupTest.java
 
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/modern/consumer/ConsumerGroupTest.java
index fc589df97a7..9c829630f09 100644
--- 
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/modern/consumer/ConsumerGroupTest.java
+++ 
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/modern/consumer/ConsumerGroupTest.java
@@ -1283,9 +1283,11 @@ public class ConsumerGroupTest {
                 new ConsumerGroupDescribeResponseData.Member()
                     .setMemberId("member1")
                     .setSubscribedTopicNames(Collections.singletonList("foo"))
-                    .setSubscribedTopicRegex(""),
+                    .setSubscribedTopicRegex("")
+                    .setMemberType((byte) 1),
                 new 
ConsumerGroupDescribeResponseData.Member().setMemberId("member2")
                     .setSubscribedTopicRegex("")
+                    .setMemberType((byte) 1)
             ));
         ConsumerGroupDescribeResponseData.DescribedGroup actual = 
group.asDescribedGroup(1, "",
             new MetadataImageBuilder().build().topics());

Reply via email to