This is an automated email from the ASF dual-hosted git repository.
chia7712 pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new fe88232b07c KAFKA-17750 Extend kafka-consumer-groups command line tool
to support new consumer group (part 1) (#17958)
fe88232b07c is described below
commit fe88232b07cd1954793cc64eea0ec735257fb622
Author: PoAn Yang <[email protected]>
AuthorDate: Wed Dec 4 06:08:39 2024 +0800
KAFKA-17750 Extend kafka-consumer-groups command line tool to support new
consumer group (part 1) (#17958)
1) Bump validVersions of ConsumerGroupDescribeRequest.json and
ConsumerGroupDescribeResponse.json to "0-1".
2) Add MemberType field to ConsumerGroupDescribeResponse.json. Default
value is -1 (unknown). 0 is for classic member and 1 is for consumer member.
3) When ConsumerGroupMember#useClassicProtocol is true, return MemberType
field as 0. Otherwise, return 1.
Reviewers: David Jacot <[email protected]>, Chia-Ping Tsai
<[email protected]>
---
.../message/ConsumerGroupDescribeRequest.json | 4 +-
.../message/ConsumerGroupDescribeResponse.json | 7 +-
.../server/ConsumerGroupDescribeRequestTest.scala | 118 ++++++++++++++++++++-
.../group/modern/consumer/ConsumerGroupMember.java | 3 +-
.../modern/consumer/ConsumerGroupMemberTest.java | 15 ++-
.../group/modern/consumer/ConsumerGroupTest.java | 4 +-
6 files changed, 138 insertions(+), 13 deletions(-)
diff --git
a/clients/src/main/resources/common/message/ConsumerGroupDescribeRequest.json
b/clients/src/main/resources/common/message/ConsumerGroupDescribeRequest.json
index a581d15dee3..ff404a0966f 100644
---
a/clients/src/main/resources/common/message/ConsumerGroupDescribeRequest.json
+++
b/clients/src/main/resources/common/message/ConsumerGroupDescribeRequest.json
@@ -18,7 +18,9 @@
"type": "request",
"listeners": ["broker"],
"name": "ConsumerGroupDescribeRequest",
- "validVersions": "0",
+ // Version 1 adds MemberType field to ConsumerGroupDescribeResponse
(KIP-1099).
+ // For ConsumerGroupDescribeRequest, version 1 is same as version 0.
+ "validVersions": "0-1",
"flexibleVersions": "0+",
"fields": [
{ "name": "GroupIds", "type": "[]string", "versions": "0+", "entityType":
"groupId",
diff --git
a/clients/src/main/resources/common/message/ConsumerGroupDescribeResponse.json
b/clients/src/main/resources/common/message/ConsumerGroupDescribeResponse.json
index 3c6ed4e78de..14d80e20ce2 100644
---
a/clients/src/main/resources/common/message/ConsumerGroupDescribeResponse.json
+++
b/clients/src/main/resources/common/message/ConsumerGroupDescribeResponse.json
@@ -17,7 +17,8 @@
"apiKey": 69,
"type": "response",
"name": "ConsumerGroupDescribeResponse",
- "validVersions": "0",
+ // Version 1 adds MemberType field (KIP-1099).
+ "validVersions": "0-1",
"flexibleVersions": "0+",
// Supported errors:
// - GROUP_AUTHORIZATION_FAILED (version 0+)
@@ -69,7 +70,9 @@
{ "name": "Assignment", "type": "Assignment", "versions": "0+",
"about": "The current assignment." },
{ "name": "TargetAssignment", "type": "Assignment", "versions":
"0+",
- "about": "The target assignment." }
+ "about": "The target assignment." },
+ { "name": "MemberType", "type": "int8", "versions": "1+",
"default": "-1", "ignorable": true,
+ "about": "-1 for unknown. 0 for classic member. +1 for consumer
member." }
]},
{ "name": "AuthorizedOperations", "type": "int32", "versions": "0+",
"default": "-2147483648",
"about": "32-bit bitfield to represent authorized operations for
this group." }
diff --git
a/core/src/test/scala/unit/kafka/server/ConsumerGroupDescribeRequestTest.scala
b/core/src/test/scala/unit/kafka/server/ConsumerGroupDescribeRequestTest.scala
index ad98d2d875e..47e287a7166 100644
---
a/core/src/test/scala/unit/kafka/server/ConsumerGroupDescribeRequestTest.scala
+++
b/core/src/test/scala/unit/kafka/server/ConsumerGroupDescribeRequestTest.scala
@@ -20,7 +20,9 @@ import org.apache.kafka.common.test.api.ClusterInstance
import org.apache.kafka.common.test.api._
import org.apache.kafka.common.test.api.ClusterTestExtensions
import kafka.utils.TestUtils
-import org.apache.kafka.common.{ConsumerGroupState, Uuid}
+import org.apache.kafka.clients.consumer.ConsumerPartitionAssignor
+import org.apache.kafka.clients.consumer.internals.ConsumerProtocol
+import org.apache.kafka.common.{ConsumerGroupState, TopicPartition, Uuid}
import
org.apache.kafka.common.message.ConsumerGroupDescribeResponseData.{Assignment,
DescribedGroup, TopicPartitions}
import org.apache.kafka.common.message.{ConsumerGroupDescribeRequestData,
ConsumerGroupDescribeResponseData, ConsumerGroupHeartbeatResponseData}
import org.apache.kafka.common.protocol.{ApiKeys, Errors}
@@ -30,10 +32,11 @@ import org.apache.kafka.common.utils.Utils
import org.apache.kafka.coordinator.group.GroupCoordinatorConfig
import org.apache.kafka.security.authorizer.AclEntry
import org.apache.kafka.server.common.Features
-import org.junit.jupiter.api.Assertions.assertEquals
+import org.junit.jupiter.api.Assertions.{assertEquals, assertFalse}
import org.junit.jupiter.api.extension.ExtendWith
import java.lang.{Byte => JByte}
+import java.util.Collections
import scala.jdk.CollectionConverters._
@ExtendWith(value = Array(classOf[ClusterTestExtensions]))
@@ -153,6 +156,7 @@ class ConsumerGroupDescribeRequestTest(cluster:
ClusterInstance) extends GroupCo
.setClientHost(clientHost)
.setSubscribedTopicRegex("")
.setSubscribedTopicNames(List("bar").asJava)
+ .setMemberType(if (version == 0) -1.toByte else 1.toByte)
).asJava),
new DescribedGroup()
.setGroupId("grp-2")
@@ -176,7 +180,8 @@ class ConsumerGroupDescribeRequestTest(cluster:
ClusterInstance) extends GroupCo
.setTopicId(topicId)
.setTopicName("foo")
.setPartitions(List[Integer](2).asJava)
- ).asJava)),
+ ).asJava))
+ .setMemberType(if (version == 0) -1.toByte else 1.toByte),
new ConsumerGroupDescribeResponseData.Member()
.setMemberId(grp2Member1Response.memberId)
.setMemberEpoch(grp2Member1Response.memberEpoch)
@@ -197,7 +202,8 @@ class ConsumerGroupDescribeRequestTest(cluster:
ClusterInstance) extends GroupCo
.setTopicId(topicId)
.setTopicName("foo")
.setPartitions(List[Integer](0, 1).asJava)
- ).asJava)),
+ ).asJava))
+ .setMemberType(if (version == 0) -1.toByte else 1.toByte),
).asJava),
)
@@ -213,4 +219,108 @@ class ConsumerGroupDescribeRequestTest(cluster:
ClusterInstance) extends GroupCo
admin.close()
}
}
+
+ @ClusterTest(
+ types = Array(Type.KRAFT),
+ serverProperties = Array(
+ new ClusterConfigProperty(key =
GroupCoordinatorConfig.OFFSETS_TOPIC_PARTITIONS_CONFIG, value = "1"),
+ new ClusterConfigProperty(key =
GroupCoordinatorConfig.OFFSETS_TOPIC_REPLICATION_FACTOR_CONFIG, value = "1")
+ )
+ )
+ def testConsumerGroupDescribeWithMigrationMember(): Unit = {
+ // Creates the __consumer_offsets topics because it won't be created
automatically
+ // in this test because it does not use FindCoordinator API.
+ createOffsetsTopic()
+
+ // Create the topic.
+ val topicName = "foo"
+ createTopic(
+ topic = topicName,
+ numPartitions = 3
+ )
+
+ val groupId = "grp"
+
+ // Classic member 1 joins the classic group.
+ val memberId1 = joinDynamicConsumerGroupWithOldProtocol(
+ groupId = groupId,
+ metadata = ConsumerProtocol.serializeSubscription(
+ new ConsumerPartitionAssignor.Subscription(
+ Collections.singletonList(topicName),
+ null,
+ List().asJava
+ )
+ ).array,
+ assignment = ConsumerProtocol.serializeAssignment(
+ new ConsumerPartitionAssignor.Assignment(
+ List(0, 1, 2).map(p => new TopicPartition(topicName, p)).asJava
+ )
+ ).array
+ )._1
+
+ // The joining request with a consumer group member 2 is accepted.
+ val memberId2 = consumerGroupHeartbeat(
+ groupId = groupId,
+ memberId = "member-2",
+ rebalanceTimeoutMs = 5 * 60 * 1000,
+ subscribedTopicNames = List(topicName),
+ topicPartitions = List.empty,
+ expectedError = Errors.NONE
+ ).memberId
+
+ for (version <- ApiKeys.CONSUMER_GROUP_DESCRIBE.oldestVersion() to
ApiKeys.CONSUMER_GROUP_DESCRIBE.latestVersion(isUnstableApiEnabled)) {
+ val actual = consumerGroupDescribe(
+ groupIds = List(groupId),
+ includeAuthorizedOperations = true,
+ version = version.toShort,
+ )
+ assertEquals(1, actual.size)
+ val group = actual.head
+ val member1 = group.members.asScala.find(_.memberId == memberId1)
+ assertFalse(member1.isEmpty)
+ // Version 0 doesn't have memberType field, so memberType field on
member 1 is -1 (unknown).
+ // After version 1, there is memberType field and it should be +1
(classic) for member 1.
+ assertEquals(if (version == 0) -1.toByte else 0.toByte,
member1.get.memberType)
+
+ val member2 = group.members.asScala.find(_.memberId == memberId2)
+ assertFalse(member2.isEmpty)
+ assertEquals(if (version == 0) -1.toByte else 1.toByte,
member2.get.memberType)
+ }
+
+ // Classic member 1 leaves group.
+ leaveGroup(
+ groupId = groupId,
+ memberId = memberId1,
+ useNewProtocol = false,
+ version = ApiKeys.LEAVE_GROUP.latestVersion(isUnstableApiEnabled)
+ )
+
+ // Member 1 joins as consumer group member.
+ consumerGroupHeartbeat(
+ groupId = groupId,
+ memberId = memberId1,
+ rebalanceTimeoutMs = 5 * 60 * 1000,
+ subscribedTopicNames = List(topicName),
+ topicPartitions = List.empty,
+ expectedError = Errors.NONE
+ )
+
+ // There is no classic member in the group.
+ for (version <- ApiKeys.CONSUMER_GROUP_DESCRIBE.oldestVersion() to
ApiKeys.CONSUMER_GROUP_DESCRIBE.latestVersion(isUnstableApiEnabled)) {
+ val actual = consumerGroupDescribe(
+ groupIds = List(groupId),
+ includeAuthorizedOperations = true,
+ version = version.toShort,
+ )
+ assertEquals(1, actual.size)
+ val group = actual.head
+ val member1 = group.members.asScala.find(_.memberId == memberId1)
+ assertFalse(member1.isEmpty)
+ assertEquals(if (version == 0) -1.toByte else 1.toByte,
member1.get.memberType)
+
+ val member2 = group.members.asScala.find(_.memberId == memberId2)
+ assertFalse(member2.isEmpty)
+ assertEquals(if (version == 0) -1.toByte else 1.toByte,
member2.get.memberType)
+ }
+ }
}
diff --git
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/modern/consumer/ConsumerGroupMember.java
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/modern/consumer/ConsumerGroupMember.java
index 2f04931bda4..69fa5cdd50c 100644
---
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/modern/consumer/ConsumerGroupMember.java
+++
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/modern/consumer/ConsumerGroupMember.java
@@ -408,7 +408,8 @@ public class ConsumerGroupMember extends ModernGroupMember {
.setInstanceId(instanceId)
.setRackId(rackId)
.setSubscribedTopicNames(subscribedTopicNames == null ? null : new
ArrayList<>(subscribedTopicNames))
- .setSubscribedTopicRegex(subscribedTopicRegex);
+ .setSubscribedTopicRegex(subscribedTopicRegex)
+ .setMemberType(useClassicProtocol() ? (byte) 0 : (byte) 1);
}
private static List<ConsumerGroupDescribeResponseData.TopicPartitions>
topicPartitionsFromMap(
diff --git
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/modern/consumer/ConsumerGroupMemberTest.java
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/modern/consumer/ConsumerGroupMemberTest.java
index 96122e79de3..658e6c23260 100644
---
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/modern/consumer/ConsumerGroupMemberTest.java
+++
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/modern/consumer/ConsumerGroupMemberTest.java
@@ -26,6 +26,8 @@ import org.apache.kafka.coordinator.group.modern.Assignment;
import org.apache.kafka.image.MetadataImage;
import org.junit.jupiter.api.Test;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.ValueSource;
import java.util.ArrayList;
import java.util.Arrays;
@@ -246,8 +248,9 @@ public class ConsumerGroupMemberTest {
assertEquals(mkAssignment(mkTopicAssignment(topicId2, 3, 4, 5)),
member.partitionsPendingRevocation());
}
- @Test
- public void testAsConsumerGroupDescribeMember() {
+ @ParameterizedTest(name = "{displayName}.withClassicMemberMetadata={0}")
+ @ValueSource(booleans = {true, false})
+ public void testAsConsumerGroupDescribeMember(boolean
withClassicMemberMetadata) {
Uuid topicId1 = Uuid.randomUuid();
Uuid topicId2 = Uuid.randomUuid();
Uuid topicId3 = Uuid.randomUuid();
@@ -287,6 +290,8 @@ public class ConsumerGroupMemberTest {
.setClientHost(clientHost)
.setSubscribedTopicNames(subscribedTopicNames)
.setSubscribedTopicRegex(subscribedTopicRegex)
+ .setClassicMemberMetadata(withClassicMemberMetadata ? new
ConsumerGroupMemberMetadataValue.ClassicMemberMetadata()
+ .setSupportedProtocols(toClassicProtocolCollection("range")) :
null)
.build();
ConsumerGroupDescribeResponseData.Member actual =
member.asConsumerGroupDescribeMember(targetAssignment, metadataImage.topics());
@@ -315,7 +320,8 @@ public class ConsumerGroupMemberTest {
.setTopicName("topic4")
.setPartitions(new ArrayList<>(item.getValue()))
).collect(Collectors.toList()))
- );
+ )
+ .setMemberType(withClassicMemberMetadata ? (byte) 0 : (byte) 1);
assertEquals(expected, actual);
}
@@ -344,7 +350,8 @@ public class ConsumerGroupMemberTest {
ConsumerGroupDescribeResponseData.Member expected = new
ConsumerGroupDescribeResponseData.Member()
.setMemberId(memberId.toString())
- .setSubscribedTopicRegex("");
+ .setSubscribedTopicRegex("")
+ .setMemberType((byte) 1);
ConsumerGroupDescribeResponseData.Member actual =
member.asConsumerGroupDescribeMember(null,
new MetadataImageBuilder()
.addTopic(Uuid.randomUuid(), "foo", 3)
diff --git
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/modern/consumer/ConsumerGroupTest.java
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/modern/consumer/ConsumerGroupTest.java
index fc589df97a7..9c829630f09 100644
---
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/modern/consumer/ConsumerGroupTest.java
+++
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/modern/consumer/ConsumerGroupTest.java
@@ -1283,9 +1283,11 @@ public class ConsumerGroupTest {
new ConsumerGroupDescribeResponseData.Member()
.setMemberId("member1")
.setSubscribedTopicNames(Collections.singletonList("foo"))
- .setSubscribedTopicRegex(""),
+ .setSubscribedTopicRegex("")
+ .setMemberType((byte) 1),
new
ConsumerGroupDescribeResponseData.Member().setMemberId("member2")
.setSubscribedTopicRegex("")
+ .setMemberType((byte) 1)
));
ConsumerGroupDescribeResponseData.DescribedGroup actual =
group.asDescribedGroup(1, "",
new MetadataImageBuilder().build().topics());