dongnuo123 commented on code in PR #14544:
URL: https://github.com/apache/kafka/pull/14544#discussion_r1375454303
##########
core/src/main/scala/kafka/server/KafkaApis.scala:
##########
@@ -3686,8 +3686,51 @@ class KafkaApis(val requestChannel: RequestChannel,
}
def handleConsumerGroupDescribe(request: RequestChannel.Request):
CompletableFuture[Unit] = {
- requestHelper.sendMaybeThrottle(request,
request.body[ConsumerGroupDescribeRequest].getErrorResponse(Errors.UNSUPPORTED_VERSION.exception))
- CompletableFuture.completedFuture[Unit](())
+ val consumerGroupDescribeRequest =
request.body[ConsumerGroupDescribeRequest]
+
+ if (!config.isNewGroupCoordinatorEnabled) {
+ // The API is not supported by the "old" group coordinator (the
default). If the
+ // new one is not enabled, we fail directly here.
+ requestHelper.sendMaybeThrottle(request,
request.body[ConsumerGroupDescribeRequest].getErrorResponse(Errors.UNSUPPORTED_VERSION.exception))
+ CompletableFuture.completedFuture[Unit](())
+ } else {
+ val response = new ConsumerGroupDescribeResponseData()
+
+ val authorizedGroups = new ArrayBuffer[String]()
+ consumerGroupDescribeRequest.data.groupIds.forEach { groupId =>
+ if (!authHelper.authorize(request.context, DESCRIBE, GROUP, groupId)) {
+ response.groups.add(new
ConsumerGroupDescribeResponseData.DescribedGroup()
+ .setGroupId(groupId)
+ .setErrorCode(Errors.GROUP_AUTHORIZATION_FAILED.code)
+ .setErrorMessage(Errors.GROUP_AUTHORIZATION_FAILED.message)
+ )
+ } else {
+ authorizedGroups += groupId
+ }
+ }
+
+ val future = groupCoordinator.consumerGroupDescribe(
+ request.context,
+ authorizedGroups.asJava
+ )
+
+ future.handle[Unit] { (results, exception) =>
Review Comment:
nit: Have we used `future` anywhere else? Could we write it like
```
groupCoordinator.consumerGroupDescribe(
request.context,
authorizedGroups.asJava
).handle[Unit] {...}
```
##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##########
@@ -445,6 +446,44 @@ public List<ListGroupsResponseData.ListedGroup>
listGroups(List<String> statesFi
return groupStream.map(group ->
group.asListedGroup(committedOffset)).collect(Collectors.toList());
}
+
+ public List<ConsumerGroupDescribeResponseData.DescribedGroup>
consumerGroupDescribe(
+ List<String> groupIds,
+ long committedOffset
+ ) {
+ List<ConsumerGroupDescribeResponseData.DescribedGroup> response = new
ArrayList<>();
+
+ for (String groupId: groupIds) {
+ Group group = groups.get(groupId, committedOffset);
+
+ ConsumerGroupDescribeResponseData.DescribedGroup describedGroup =
new ConsumerGroupDescribeResponseData.DescribedGroup()
+ .setGroupId(groupId);
+
+ if (group == null || !CONSUMER.equals(group.type())) {
+ // We don't support upgrading/downgrading between protocols at
the moment so
+ // we set an error if a group exists with the wrong type.
+
describedGroup.setErrorMessage(Errors.INVALID_GROUP_ID.message());
+ describedGroup.setErrorCode(Errors.INVALID_GROUP_ID.code());
+ } else {
+ ConsumerGroup consumerGroup = (ConsumerGroup) group;
+ describedGroup.setGroupState(consumerGroup.stateAsString())
+ .setGroupEpoch(consumerGroup.groupEpoch())
+ .setAssignmentEpoch(consumerGroup.assignmentEpoch())
+ .setAssignorName(
+ consumerGroup.preferredServerAssignor().isPresent() ?
+ consumerGroup.preferredServerAssignor().get() :
null
+ );
+ consumerGroup.members().forEach(
+ (id, member) ->
describedGroup.members().add(member.asConsumerGroupDescribeMember())
+ );
Review Comment:
Attributes like group state, epoch, members are timeline object in
ConsumerGroup. You can get them according to the committedOffset. An example is
`public String stateAsString(long committedOffset)` in ConsumerGroup.
##########
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java:
##########
@@ -8660,6 +8665,68 @@ public void testListGroups() {
assertEquals(expectAllGroupMap, actualAllGroupMap);
}
+ @Test
+ public void testConsumerGroupDescribeNoErrors() {
+ String consumerGroupId = "consumerGroupId";
+ int epoch = 10;
+ String memberId = Uuid.randomUuid().toString();
+ String topicName = "topicName";
+
+ MockPartitionAssignor assignor = new MockPartitionAssignor("range");
+ GroupMetadataManagerTestContext context = new
GroupMetadataManagerTestContext.Builder()
+ .withAssignors(Collections.singletonList(assignor))
+ .withConsumerGroup(new ConsumerGroupBuilder(consumerGroupId,
epoch))
+ .build();
+
+ ConsumerGroupMember.Builder memberBuilder = new
ConsumerGroupMember.Builder(memberId)
+ .setSubscribedTopicNames(Collections.singletonList(topicName));
+ context.replay(RecordHelpers.newMemberSubscriptionRecord(
+ consumerGroupId,
+ memberBuilder.build()
+ ));
+ context.replay(RecordHelpers.newGroupEpochRecord(consumerGroupId,
epoch + 1));
+
+ List<ConsumerGroupDescribeResponseData.DescribedGroup> actual =
context.sendConsumerGroupDescribe(Arrays.asList(consumerGroupId));
+ ConsumerGroupDescribeResponseData.DescribedGroup describedGroup = new
ConsumerGroupDescribeResponseData.DescribedGroup();
+ describedGroup.setGroupEpoch(epoch + 1);
+ describedGroup.setGroupId(consumerGroupId);
+
describedGroup.setMembers(Collections.singletonList(memberBuilder.build().asConsumerGroupDescribeMember()));
+ describedGroup.setAssignorName(null);
+ describedGroup.setGroupState("assigning");
+ List<ConsumerGroupDescribeResponseData.DescribedGroup> expected =
Collections.singletonList(
+ describedGroup
+ );
+
+ assertEquals(expected, actual);
+ }
Review Comment:
Could we test more on describe the groups by committedOffset?
For instance, we've added a group but not updated the last committed offset,
then we shouldn't be able to describe this group. After updating the last
committed offset, we're able to describe the group.
##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##########
@@ -444,6 +445,43 @@ public List<ListGroupsResponseData.ListedGroup>
listGroups(List<String> statesFi
return groupStream.map(group ->
group.asListedGroup(committedOffset)).collect(Collectors.toList());
}
+
+ public List<ConsumerGroupDescribeResponseData.DescribedGroup>
consumerGroupDescribe(
+ List<String> groupIds
+ ) {
+ List<ConsumerGroupDescribeResponseData.DescribedGroup> response = new
ArrayList<>();
+
+ for (String groupId: groupIds) {
+ Group group = groups.get(groupId);
+
+ ConsumerGroupDescribeResponseData.DescribedGroup describedGroup =
new ConsumerGroupDescribeResponseData.DescribedGroup()
+ .setGroupId(groupId);
+
+ if (group == null || !CONSUMER.equals(group.type())) {
+ // We don't support upgrading/downgrading between protocols at
the moment so
+ // we set an error if a group exists with the wrong type.
+
describedGroup.setErrorMessage(Errors.INVALID_GROUP_ID.message());
+ describedGroup.setErrorCode(Errors.INVALID_GROUP_ID.code());
+ } else {
+ ConsumerGroup consumerGroup = (ConsumerGroup) group;
+ describedGroup.setGroupState(consumerGroup.stateAsString())
+ .setGroupEpoch(consumerGroup.groupEpoch())
+ .setAssignmentEpoch(consumerGroup.assignmentEpoch())
+ .setAssignorName(
+ consumerGroup.preferredServerAssignor().isPresent() ?
+ consumerGroup.preferredServerAssignor().get() :
null
Review Comment:
I think yes though we need to confirm with @dajac
##########
core/src/test/scala/unit/kafka/server/KafkaApisTest.scala:
##########
@@ -6206,12 +6206,40 @@ class KafkaApisTest {
assertEquals(Errors.GROUP_AUTHORIZATION_FAILED.code,
response.data.errorCode)
}
+ @Test
+ def testConsumerGroupDescribe(): Unit = {
+ val groupId = "group0"
+ val consumerGroupDescribeRequestData = new
ConsumerGroupDescribeRequestData()
+ consumerGroupDescribeRequestData.groupIds.add(groupId)
+ val requestChannelRequest = buildRequest(new
ConsumerGroupDescribeRequest.Builder(consumerGroupDescribeRequestData,
true).build())
+
+ val future = new
CompletableFuture[util.List[ConsumerGroupDescribeResponseData.DescribedGroup]]()
+ when(groupCoordinator.consumerGroupDescribe(
+ any[RequestContext],
+ any[util.List[String]]
+ )).thenReturn(future)
+
+ createKafkaApis(
+ overrideProperties = Map(KafkaConfig.NewGroupCoordinatorEnableProp ->
"true")
+ ).handle(requestChannelRequest, RequestLocal.NoCaching)
+
+ val response =
verifyNoThrottling[ConsumerGroupDescribeResponse](requestChannelRequest)
+
+ val describedGroups = List(new DescribedGroup()).asJava
+ val consumerGroupDescribeResponseData = new
ConsumerGroupDescribeResponseData()
+ .setGroups(describedGroups)
+ future.complete(describedGroups)
+
+ assertEquals(consumerGroupDescribeResponseData, response.data)
+ }
+
Review Comment:
Could we add some extra tests to test request with unauthorized groups and
future that completes with exception?
##########
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorServiceTest.java:
##########
@@ -1046,6 +1047,112 @@ public void
testLeaveGroupThrowsUnknownMemberIdException() throws Exception {
assertEquals(expectedResponse, future.get());
}
+ @Test
+ public void testConsumerGroupDescribe() throws InterruptedException,
ExecutionException {
+ CoordinatorRuntime<GroupCoordinatorShard, Record> runtime =
mockRuntime();
+ GroupCoordinatorService service = new GroupCoordinatorService(
+ new LogContext(),
+ createConfig(),
+ runtime
+ );
+ int partitionCount = 2;
+ service.startup(() -> partitionCount);
+
+ ConsumerGroupDescribeResponseData.DescribedGroup describedGroup1 = new
ConsumerGroupDescribeResponseData.DescribedGroup()
+ .setGroupId("group-id-1");
+ ConsumerGroupDescribeResponseData.DescribedGroup describedGroup2 = new
ConsumerGroupDescribeResponseData.DescribedGroup()
+ .setGroupId("group-id-2");
+ List<ConsumerGroupDescribeResponseData.DescribedGroup>
expectedDescribedGroups = Arrays.asList(
+ describedGroup1,
+ describedGroup2
+ );
+
+ when(runtime.scheduleReadOperation(
+ ArgumentMatchers.eq("consumer-group-describe"),
+ ArgumentMatchers.eq(new TopicPartition("__consumer_offsets", 0)),
+ ArgumentMatchers.any()
+
)).thenReturn(CompletableFuture.completedFuture(Collections.singletonList(describedGroup1)));
+
+ CompletableFuture<Object> describedGroupFuture = new
CompletableFuture<>();
+ when(runtime.scheduleReadOperation(
+ ArgumentMatchers.eq("consumer-group-describe"),
+ ArgumentMatchers.eq(new TopicPartition("__consumer_offsets", 1)),
+ ArgumentMatchers.any()
+ )).thenReturn(describedGroupFuture);
+
+
CompletableFuture<List<ConsumerGroupDescribeResponseData.DescribedGroup>>
future =
+
service.consumerGroupDescribe(requestContext(ApiKeys.CONSUMER_GROUP_DESCRIBE),
Arrays.asList("group-id-1", "group-id-2"));
+
+ assertFalse(future.isDone());
+
describedGroupFuture.complete(Collections.singletonList(describedGroup2));
+ assertEquals(expectedDescribedGroups, future.get());
+ }
+
+ @Test
+ public void testConsumerGroupDescribeInvalidGroupId() throws
ExecutionException, InterruptedException {
+ CoordinatorRuntime<GroupCoordinatorShard, Record> runtime =
mockRuntime();
+ GroupCoordinatorService service = new GroupCoordinatorService(
+ new LogContext(),
+ createConfig(),
+ runtime
+ );
+ int partitionCount = 1;
+ service.startup(() -> partitionCount);
+
+ ConsumerGroupDescribeResponseData.DescribedGroup describedGroup = new
ConsumerGroupDescribeResponseData.DescribedGroup()
+ .setGroupId("");
+ List<ConsumerGroupDescribeResponseData.DescribedGroup>
expectedDescribedGroups = Arrays.asList(
+ new ConsumerGroupDescribeResponseData.DescribedGroup()
+ .setGroupId(null)
+ .setErrorCode(Errors.INVALID_GROUP_ID.code())
+ .setErrorMessage(Errors.INVALID_GROUP_ID.message()),
+ describedGroup
+ );
+
+ when(runtime.scheduleReadOperation(
+ ArgumentMatchers.eq("consumer-group-describe"),
+ ArgumentMatchers.eq(new TopicPartition("__consumer_offsets", 0)),
+ ArgumentMatchers.any()
+
)).thenReturn(CompletableFuture.completedFuture(Collections.singletonList(describedGroup)));
+
+
CompletableFuture<List<ConsumerGroupDescribeResponseData.DescribedGroup>>
future =
+
service.consumerGroupDescribe(requestContext(ApiKeys.CONSUMER_GROUP_DESCRIBE),
Arrays.asList("", null));
+
+ assertEquals(expectedDescribedGroups, future.get());
+ }
+
+ @Test
+ public void testConsumerGroupDescribeCoordinatorLoadInProgress() throws
ExecutionException, InterruptedException {
+ CoordinatorRuntime<GroupCoordinatorShard, Record> runtime =
mockRuntime();
+ GroupCoordinatorService service = new GroupCoordinatorService(
+ new LogContext(),
+ createConfig(),
+ runtime
+ );
+ int partitionCount = 1;
+ service.startup(() -> partitionCount);
+
+ when(runtime.scheduleReadOperation(
+ ArgumentMatchers.eq("consumer-group-describe"),
+ ArgumentMatchers.eq(new TopicPartition("__consumer_offsets", 0)),
+ ArgumentMatchers.any()
+ )).thenReturn(FutureUtils.failedFuture(
+ new CoordinatorLoadInProgressException(null)
+ ));
+
+
CompletableFuture<List<ConsumerGroupDescribeResponseData.DescribedGroup>>
future =
+
service.consumerGroupDescribe(requestContext(ApiKeys.CONSUMER_GROUP_DESCRIBE),
Collections.singletonList("group-id"));
+
+ assertEquals(
+ Collections.singletonList(new
ConsumerGroupDescribeResponseData.DescribedGroup()
+ .setGroupId("group-id")
+ .setErrorCode(Errors.COORDINATOR_LOAD_IN_PROGRESS.code())
+ .setErrorMessage(Errors.COORDINATOR_LOAD_IN_PROGRESS.message())
+ ),
+ future.get()
+ );
+ }
+
Review Comment:
Could we add another test when the coordinator is not active?
##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorService.java:
##########
@@ -497,6 +499,66 @@ public CompletableFuture<ListGroupsResponseData>
listGroups(
return future;
}
+ /**
+ * See {@link GroupCoordinator#consumerGroupDescribe(RequestContext,
List)}.
+ */
+ @Override
+ public
CompletableFuture<List<ConsumerGroupDescribeResponseData.DescribedGroup>>
consumerGroupDescribe(
+ RequestContext context,
+ List<String> groupIds
+ ) {
+ if (!isActive.get()) {
+ return
FutureUtils.failedFuture(Errors.COORDINATOR_NOT_AVAILABLE.exception());
+ }
Review Comment:
We should return a completed future with a response whose error code set to
Errors.COORDINATOR_NOT_AVAILABLE.code(). We'll fix this in the other apis in
https://github.com/apache/kafka/pull/14589
##########
core/src/test/scala/unit/kafka/server/KafkaApisTest.scala:
##########
@@ -6206,12 +6206,40 @@ class KafkaApisTest {
assertEquals(Errors.GROUP_AUTHORIZATION_FAILED.code,
response.data.errorCode)
}
+ @Test
+ def testConsumerGroupDescribe(): Unit = {
+ val groupId = "group0"
+ val consumerGroupDescribeRequestData = new
ConsumerGroupDescribeRequestData()
+ consumerGroupDescribeRequestData.groupIds.add(groupId)
+ val requestChannelRequest = buildRequest(new
ConsumerGroupDescribeRequest.Builder(consumerGroupDescribeRequestData,
true).build())
+
+ val future = new
CompletableFuture[util.List[ConsumerGroupDescribeResponseData.DescribedGroup]]()
+ when(groupCoordinator.consumerGroupDescribe(
+ any[RequestContext],
+ any[util.List[String]]
+ )).thenReturn(future)
+
+ createKafkaApis(
+ overrideProperties = Map(KafkaConfig.NewGroupCoordinatorEnableProp ->
"true")
+ ).handle(requestChannelRequest, RequestLocal.NoCaching)
+
+ val response =
verifyNoThrottling[ConsumerGroupDescribeResponse](requestChannelRequest)
+
+ val describedGroups = List(new DescribedGroup()).asJava
+ val consumerGroupDescribeResponseData = new
ConsumerGroupDescribeResponseData()
+ .setGroups(describedGroups)
+ future.complete(describedGroups)
Review Comment:
I guess the mock not invoked error is because the future is not completed
when sending the response (calling `verifyNoThrottling`). Some mock sending the
response is not invoked and cause the error. Something like the following
should help.
```
val describedGroups = List(new DescribedGroup()).asJava
val consumerGroupDescribeResponseData = new
ConsumerGroupDescribeResponseData()
.setGroups(describedGroups)
future.complete(describedGroups)
val response =
verifyNoThrottling[ConsumerGroupDescribeResponse](requestChannelRequest)
assert...
```
##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/ConsumerGroupMember.java:
##########
@@ -545,6 +547,26 @@ public String currentAssignmentSummary() {
')';
}
+ public ConsumerGroupDescribeResponseData.Member
asConsumerGroupDescribeMember() {
+ return new ConsumerGroupDescribeResponseData.Member()
+ .setMemberEpoch(memberEpoch)
+ .setMemberId(Uuid.fromString(memberId))
+ .setAssignment(new ConsumerGroupDescribeResponseData.Assignment()
+ .setTopicPartitions(
+ assignedPartitions.entrySet().stream().map(
+ item -> new
ConsumerGroupDescribeResponseData.TopicPartitions()
+ .setTopicId(item.getKey())
+ .setPartitions(new ArrayList<>(item.getValue()))
Review Comment:
I guess we don't need to set the topic name here. Need to confirm with
@dajac
##########
core/src/test/scala/unit/kafka/server/KafkaApisTest.scala:
##########
@@ -6206,12 +6206,40 @@ class KafkaApisTest {
assertEquals(Errors.GROUP_AUTHORIZATION_FAILED.code,
response.data.errorCode)
}
+ @Test
+ def testConsumerGroupDescribe(): Unit = {
+ val groupId = "group0"
+ val consumerGroupDescribeRequestData = new
ConsumerGroupDescribeRequestData()
+ consumerGroupDescribeRequestData.groupIds.add(groupId)
+ val requestChannelRequest = buildRequest(new
ConsumerGroupDescribeRequest.Builder(consumerGroupDescribeRequestData,
true).build())
+
+ val future = new
CompletableFuture[util.List[ConsumerGroupDescribeResponseData.DescribedGroup]]()
+ when(groupCoordinator.consumerGroupDescribe(
+ any[RequestContext],
+ any[util.List[String]]
+ )).thenReturn(future)
Review Comment:
You can change it back to the way it was. This is actually invoked. See
comment on line 6226.
##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/ConsumerGroupMember.java:
##########
@@ -545,6 +547,26 @@ public String currentAssignmentSummary() {
')';
}
+ public ConsumerGroupDescribeResponseData.Member
asConsumerGroupDescribeMember() {
+ return new ConsumerGroupDescribeResponseData.Member()
+ .setMemberEpoch(memberEpoch)
+ .setMemberId(Uuid.fromString(memberId))
+ .setAssignment(new ConsumerGroupDescribeResponseData.Assignment()
+ .setTopicPartitions(
+ assignedPartitions.entrySet().stream().map(
+ item -> new
ConsumerGroupDescribeResponseData.TopicPartitions()
+ .setTopicId(item.getKey())
+ .setPartitions(new ArrayList<>(item.getValue()))
+ ).collect(Collectors.toList())
+ ))
+ .setClientHost(clientHost)
+ .setClientId(clientId)
+ .setInstanceId(instanceId)
+ .setRackId(rackId)
+ .setSubscribedTopicNames(subscribedTopicNames)
+ .setSubscribedTopicRegex(subscribedTopicRegex);
Review Comment:
We have target assignments stored in ConsumerGroup. Can we pass it in as a
parameter or use anyway that helps it access the `targetAssignment` in
ConsumerGroup?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]