This is an automated email from the ASF dual-hosted git repository.
dajac pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new f309299f3cf KAFKA-14503: Implement ListGroups (#14271)
f309299f3cf is described below
commit f309299f3cf92c3ed6fe545c628117b9028c2917
Author: zhaohaidao <[email protected]>
AuthorDate: Fri Sep 15 14:45:03 2023 +0800
KAFKA-14503: Implement ListGroups (#14271)
This patch implements the ListGroups API in the new group coordinator.
Reviewers: David Jacot <[email protected]>
---
checkstyle/suppressions.xml | 2 +-
.../org/apache/kafka/coordinator/group/Group.java | 11 ++
.../coordinator/group/GroupCoordinatorService.java | 34 ++++-
.../coordinator/group/GroupCoordinatorShard.java | 17 +++
.../coordinator/group/GroupMetadataManager.java | 20 +++
.../coordinator/group/consumer/ConsumerGroup.java | 26 ++++
.../coordinator/group/generic/GenericGroup.java | 14 +-
.../group/GroupCoordinatorServiceTest.java | 148 +++++++++++++++++++++
.../group/GroupMetadataManagerTest.java | 87 +++++++++++-
.../group/consumer/ConsumerGroupTest.java | 14 ++
10 files changed, 365 insertions(+), 8 deletions(-)
diff --git a/checkstyle/suppressions.xml b/checkstyle/suppressions.xml
index 6e946b1bead..9862a237fa9 100644
--- a/checkstyle/suppressions.xml
+++ b/checkstyle/suppressions.xml
@@ -326,7 +326,7 @@
<suppress checks="(NPathComplexity|MethodLength)"
files="(GroupMetadataManager|ConsumerGroupTest|GroupMetadataManagerTest).java"/>
<suppress checks="ClassFanOutComplexity"
- files="(GroupMetadataManager|GroupMetadataManagerTest).java"/>
+
files="(GroupMetadataManager|GroupMetadataManagerTest|GroupCoordinatorServiceTest).java"/>
<suppress checks="ParameterNumber"
files="(ConsumerGroupMember|GroupMetadataManager).java"/>
<suppress checks="ClassDataAbstractionCouplingCheck"
diff --git
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/Group.java
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/Group.java
index 44290ae7fe7..cbd25cb0700 100644
---
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/Group.java
+++
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/Group.java
@@ -17,6 +17,7 @@
package org.apache.kafka.coordinator.group;
import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.message.ListGroupsResponseData;
/**
* Interface common for all groups.
@@ -48,6 +49,16 @@ public interface Group {
*/
String stateAsString();
+ /**
+ * @return The {{@link GroupType}}'s String representation based on the
committed offset.
+ */
+ String stateAsString(long committedOffset);
+
+ /**
+ * @return the group formatted as a list group response based on the
committed offset.
+ */
+ public ListGroupsResponseData.ListedGroup asListedGroup(long
committedOffset);
+
/**
* @return The group id.
*/
diff --git
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorService.java
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorService.java
index ac3aa9d45b4..9e11cfa63e7 100644
---
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorService.java
+++
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorService.java
@@ -22,6 +22,7 @@ import org.apache.kafka.common.config.TopicConfig;
import org.apache.kafka.common.errors.CoordinatorLoadInProgressException;
import org.apache.kafka.common.errors.InvalidFetchSizeException;
import org.apache.kafka.common.errors.KafkaStorageException;
+import org.apache.kafka.common.errors.NotCoordinatorException;
import org.apache.kafka.common.errors.NotEnoughReplicasException;
import org.apache.kafka.common.errors.NotLeaderOrFollowerException;
import org.apache.kafka.common.errors.RecordBatchTooLargeException;
@@ -73,12 +74,15 @@ import org.apache.kafka.server.util.FutureUtils;
import org.apache.kafka.server.util.timer.Timer;
import org.slf4j.Logger;
+import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.OptionalInt;
import java.util.Properties;
+import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.IntSupplier;
import java.util.stream.Collectors;
@@ -460,9 +464,33 @@ public class GroupCoordinatorService implements
GroupCoordinator {
return
FutureUtils.failedFuture(Errors.COORDINATOR_NOT_AVAILABLE.exception());
}
- return FutureUtils.failedFuture(Errors.UNSUPPORTED_VERSION.exception(
- "This API is not implemented yet."
- ));
+ final CompletableFuture<ListGroupsResponseData> future = new
CompletableFuture<>();
+ final List<ListGroupsResponseData.ListedGroup> results = new
ArrayList<>();
+ final Set<TopicPartition> existingPartitionSet = runtime.partitions();
+ final AtomicInteger cnt = new
AtomicInteger(existingPartitionSet.size());
+
+ for (TopicPartition tp : existingPartitionSet) {
+ runtime.scheduleReadOperation(
+ "list-groups",
+ tp,
+ (coordinator, lastCommittedOffset) ->
coordinator.listGroups(request.statesFilter(), lastCommittedOffset)
+ ).handle((groups, exception) -> {
+ if (exception == null) {
+ synchronized (results) {
+ results.addAll(groups);
+ }
+ } else {
+ if (!(exception instanceof NotCoordinatorException)) {
+ future.complete(new
ListGroupsResponseData().setErrorCode(Errors.forException(exception).code()));
+ }
+ }
+ if (cnt.decrementAndGet() == 0) {
+ future.complete(new
ListGroupsResponseData().setGroups(results));
+ }
+ return null;
+ });
+ }
+ return future;
}
/**
diff --git
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorShard.java
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorShard.java
index ed68bb22c5a..97327c9722f 100644
---
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorShard.java
+++
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorShard.java
@@ -24,6 +24,7 @@ import org.apache.kafka.common.message.JoinGroupRequestData;
import org.apache.kafka.common.message.JoinGroupResponseData;
import org.apache.kafka.common.message.LeaveGroupRequestData;
import org.apache.kafka.common.message.LeaveGroupResponseData;
+import org.apache.kafka.common.message.ListGroupsResponseData;
import org.apache.kafka.common.message.OffsetCommitRequestData;
import org.apache.kafka.common.message.OffsetCommitResponseData;
import org.apache.kafka.common.message.OffsetFetchRequestData;
@@ -60,6 +61,7 @@ import org.apache.kafka.image.MetadataImage;
import org.apache.kafka.server.common.ApiMessageAndVersion;
import org.apache.kafka.timeline.SnapshotRegistry;
+import java.util.List;
import java.util.concurrent.CompletableFuture;
/**
@@ -308,6 +310,21 @@ public class GroupCoordinatorShard implements
CoordinatorShard<Record> {
return offsetMetadataManager.commitOffset(context, request);
}
+ /**
+ * Handles a ListGroups request.
+ *
+ * @param statesFilter The states of the groups we want to list.
+ * If empty all groups are returned with their
state.
+ * @param committedOffset A specified committed offset corresponding to
this shard
+ * @return A list containing the ListGroupsResponseData.ListedGroup
+ */
+ public List<ListGroupsResponseData.ListedGroup> listGroups(
+ List<String> statesFilter,
+ long committedOffset
+ ) throws ApiException {
+ return groupMetadataManager.listGroups(statesFilter, committedOffset);
+ }
+
/**
* Handles a LeaveGroup request.
*
diff --git
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java
index a174a0fb1ff..002b7956857 100644
---
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java
+++
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java
@@ -41,6 +41,7 @@ import org.apache.kafka.common.message.LeaveGroupRequestData;
import org.apache.kafka.common.message.LeaveGroupRequestData.MemberIdentity;
import org.apache.kafka.common.message.LeaveGroupResponseData;
import org.apache.kafka.common.message.LeaveGroupResponseData.MemberResponse;
+import org.apache.kafka.common.message.ListGroupsResponseData;
import org.apache.kafka.common.message.SyncGroupRequestData;
import org.apache.kafka.common.message.SyncGroupResponseData;
import org.apache.kafka.common.protocol.Errors;
@@ -95,6 +96,7 @@ import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;
import java.util.stream.Collectors;
+import java.util.stream.Stream;
import static
org.apache.kafka.common.protocol.Errors.COORDINATOR_NOT_AVAILABLE;
import static org.apache.kafka.common.protocol.Errors.ILLEGAL_GENERATION;
@@ -424,6 +426,24 @@ public class GroupMetadataManager {
return group;
}
+ /**
+ * Get the Group List.
+ *
+ * @param statesFilter The states of the groups we want to list.
+ * If empty all groups are returned with their state.
+ * @param committedOffset A specified committed offset corresponding to
this shard
+ *
+ * @return A list containing the ListGroupsResponseData.ListedGroup
+ */
+
+ public List<ListGroupsResponseData.ListedGroup> listGroups(List<String>
statesFilter, long committedOffset) {
+ Stream<Group> groupStream = groups.values(committedOffset).stream();
+ if (!statesFilter.isEmpty()) {
+ groupStream = groupStream.filter(group ->
statesFilter.contains(group.stateAsString(committedOffset)));
+ }
+ return groupStream.map(group ->
group.asListedGroup(committedOffset)).collect(Collectors.toList());
+ }
+
/**
* Gets or maybe creates a consumer group.
*
diff --git
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/ConsumerGroup.java
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/ConsumerGroup.java
index 5328020baac..10222c3a3cc 100644
---
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/ConsumerGroup.java
+++
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/ConsumerGroup.java
@@ -16,9 +16,11 @@
*/
package org.apache.kafka.coordinator.group.consumer;
+import org.apache.kafka.clients.consumer.internals.ConsumerProtocol;
import org.apache.kafka.common.Uuid;
import org.apache.kafka.common.errors.StaleMemberEpochException;
import org.apache.kafka.common.errors.UnknownMemberIdException;
+import org.apache.kafka.common.message.ListGroupsResponseData;
import org.apache.kafka.coordinator.group.Group;
import org.apache.kafka.image.ClusterImage;
import org.apache.kafka.image.TopicImage;
@@ -179,6 +181,23 @@ public class ConsumerGroup implements Group {
return state.get().toString();
}
+ /**
+ * @return The current state as a String with given committedOffset.
+ */
+ public String stateAsString(long committedOffset) {
+ return state.get(committedOffset).toString();
+ }
+
+ /**
+ * @return the group formatted as a list group response based on the
committed offset.
+ */
+ public ListGroupsResponseData.ListedGroup asListedGroup(long
committedOffset) {
+ return new ListGroupsResponseData.ListedGroup()
+ .setGroupId(groupId)
+ .setProtocolType(ConsumerProtocol.PROTOCOL_TYPE)
+ .setGroupState(state.get(committedOffset).toString());
+ }
+
/**
* @return The group id.
*/
@@ -194,6 +213,13 @@ public class ConsumerGroup implements Group {
return state.get();
}
+ /**
+ * @return The current state based on committed offset.
+ */
+ public ConsumerGroupState state(long committedOffset) {
+ return state.get(committedOffset);
+ }
+
/**
* @return The group epoch.
*/
diff --git
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/generic/GenericGroup.java
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/generic/GenericGroup.java
index d163f385fe0..18f96f2f78e 100644
---
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/generic/GenericGroup.java
+++
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/generic/GenericGroup.java
@@ -242,6 +242,16 @@ public class GenericGroup implements Group {
return this.state.toString();
}
+ /**
+ * The state of this group based on the committed offset.
+ *
+ * @return The current state as a String.
+ */
+ @Override
+ public String stateAsString(long committedOffset) {
+ return this.state.toString();
+ }
+
/**
* @return the group id.
*/
@@ -1167,9 +1177,9 @@ public class GenericGroup implements Group {
}
/**
- * @return the group formatted as a list group response.
+ * @return the group formatted as a list group response based on the
committed offset.
*/
- public ListGroupsResponseData.ListedGroup asListedGroup() {
+ public ListGroupsResponseData.ListedGroup asListedGroup(long
committedOffset) {
return new ListGroupsResponseData.ListedGroup()
.setGroupId(groupId)
.setProtocolType(protocolType.orElse(""))
diff --git
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorServiceTest.java
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorServiceTest.java
index 61af5582824..8f39f524484 100644
---
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorServiceTest.java
+++
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorServiceTest.java
@@ -16,6 +16,7 @@
*/
package org.apache.kafka.coordinator.group;
+import org.apache.kafka.clients.consumer.internals.ConsumerProtocol;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.config.TopicConfig;
import org.apache.kafka.common.errors.CoordinatorLoadInProgressException;
@@ -23,6 +24,7 @@ import
org.apache.kafka.common.errors.CoordinatorNotAvailableException;
import org.apache.kafka.common.errors.InvalidFetchSizeException;
import org.apache.kafka.common.errors.InvalidRequestException;
import org.apache.kafka.common.errors.KafkaStorageException;
+import org.apache.kafka.common.errors.NotCoordinatorException;
import org.apache.kafka.common.errors.NotEnoughReplicasException;
import org.apache.kafka.common.errors.NotLeaderOrFollowerException;
import org.apache.kafka.common.errors.RebalanceInProgressException;
@@ -36,6 +38,8 @@ import org.apache.kafka.common.message.HeartbeatRequestData;
import org.apache.kafka.common.message.HeartbeatResponseData;
import org.apache.kafka.common.message.JoinGroupRequestData;
import org.apache.kafka.common.message.JoinGroupResponseData;
+import org.apache.kafka.common.message.ListGroupsRequestData;
+import org.apache.kafka.common.message.ListGroupsResponseData;
import org.apache.kafka.common.message.OffsetFetchRequestData;
import org.apache.kafka.common.message.OffsetFetchResponseData;
import org.apache.kafka.common.message.LeaveGroupRequestData;
@@ -63,10 +67,12 @@ import org.junit.jupiter.params.provider.Arguments;
import org.junit.jupiter.params.provider.MethodSource;
import org.junit.jupiter.params.provider.ValueSource;
import org.mockito.ArgumentMatchers;
+import org.mockito.internal.util.collections.Sets;
import java.net.InetAddress;
import java.util.Arrays;
import java.util.Collections;
+import java.util.List;
import java.util.OptionalInt;
import java.util.Properties;
import java.util.concurrent.CompletableFuture;
@@ -603,6 +609,148 @@ public class GroupCoordinatorServiceTest {
);
}
+ @Test
+ public void testListGroups() throws ExecutionException,
InterruptedException, TimeoutException {
+ CoordinatorRuntime<GroupCoordinatorShard, Record> runtime =
mockRuntime();
+ GroupCoordinatorService service = new GroupCoordinatorService(
+ new LogContext(),
+ createConfig(),
+ runtime
+ );
+ int partitionCount = 3;
+ service.startup(() -> partitionCount);
+
+ ListGroupsRequestData request = new ListGroupsRequestData();
+
+ List<ListGroupsResponseData.ListedGroup> expectedResults =
Arrays.asList(
+ new ListGroupsResponseData.ListedGroup()
+ .setGroupId("group0")
+ .setGroupState("Stable")
+ .setProtocolType("protocol1"),
+ new ListGroupsResponseData.ListedGroup()
+ .setGroupId("group1")
+ .setGroupState("Empty")
+ .setProtocolType(ConsumerProtocol.PROTOCOL_TYPE),
+ new ListGroupsResponseData.ListedGroup()
+ .setGroupId("group2")
+ .setGroupState("Dead")
+ .setProtocolType(ConsumerProtocol.PROTOCOL_TYPE)
+ );
+ when(runtime.partitions()).thenReturn(Sets.newSet(
+ new TopicPartition("__consumer_offsets", 0),
+ new TopicPartition("__consumer_offsets", 1),
+ new TopicPartition("__consumer_offsets", 2)
+ ));
+ for (int i = 0; i < partitionCount; i++) {
+ when(runtime.scheduleReadOperation(
+ ArgumentMatchers.eq("list-groups"),
+ ArgumentMatchers.eq(new TopicPartition("__consumer_offsets",
i)),
+ ArgumentMatchers.any()
+
)).thenReturn(CompletableFuture.completedFuture(Collections.singletonList(expectedResults.get(i))));
+ }
+
+ CompletableFuture<ListGroupsResponseData> responseFuture =
service.listGroups(
+ requestContext(ApiKeys.LIST_GROUPS),
+ request
+ );
+
+ List<ListGroupsResponseData.ListedGroup> actualResults =
responseFuture.get(5, TimeUnit.SECONDS).groups();
+ assertEquals(expectedResults, actualResults);
+ }
+
+ @Test
+ public void testListGroupsFailedWithNotCoordinatorException()
+ throws InterruptedException, ExecutionException, TimeoutException {
+ CoordinatorRuntime<GroupCoordinatorShard, Record> runtime =
mockRuntime();
+ GroupCoordinatorService service = new GroupCoordinatorService(
+ new LogContext(),
+ createConfig(),
+ runtime
+ );
+ int partitionCount = 3;
+ service.startup(() -> partitionCount);
+
+ List<ListGroupsResponseData.ListedGroup> expectedResults =
Arrays.asList(
+ new ListGroupsResponseData.ListedGroup()
+ .setGroupId("group0")
+ .setGroupState("Stable")
+ .setProtocolType("protocol1"),
+ new ListGroupsResponseData.ListedGroup()
+ .setGroupId("group1")
+ .setGroupState("Empty")
+ .setProtocolType(ConsumerProtocol.PROTOCOL_TYPE)
+ );
+
+ ListGroupsRequestData request = new ListGroupsRequestData();
+ when(runtime.partitions()).thenReturn(Sets.newSet(
+ new TopicPartition("__consumer_offsets", 0),
+ new TopicPartition("__consumer_offsets", 1),
+ new TopicPartition("__consumer_offsets", 2)
+ ));
+ for (int i = 0; i < 2; i++) {
+ when(runtime.scheduleReadOperation(
+ ArgumentMatchers.eq("list-groups"),
+ ArgumentMatchers.eq(new TopicPartition("__consumer_offsets",
i)),
+ ArgumentMatchers.any()
+
)).thenReturn(CompletableFuture.completedFuture(Collections.singletonList(expectedResults.get(i))));
+ }
+
+ when(runtime.scheduleReadOperation(
+ ArgumentMatchers.eq("list-groups"),
+ ArgumentMatchers.eq(new TopicPartition("__consumer_offsets", 2)),
+ ArgumentMatchers.any()
+ )).thenReturn(FutureUtils.failedFuture(new
NotCoordinatorException("")));
+
+ CompletableFuture<ListGroupsResponseData> responseFuture =
service.listGroups(
+ requestContext(ApiKeys.LIST_GROUPS),
+ request
+ );
+ List<ListGroupsResponseData.ListedGroup> actualResults =
responseFuture.get(5, TimeUnit.SECONDS).groups();
+ assertEquals(expectedResults, actualResults);
+ }
+
+ @Test
+ public void testListGroupsFailedImmediately()
+ throws InterruptedException, ExecutionException, TimeoutException {
+ CoordinatorRuntime<GroupCoordinatorShard, Record> runtime =
mockRuntime();
+ GroupCoordinatorService service = new GroupCoordinatorService(
+ new LogContext(),
+ createConfig(),
+ runtime
+ );
+ int partitionCount = 3;
+ service.startup(() -> partitionCount);
+
+ ListGroupsRequestData request = new ListGroupsRequestData();
+ when(runtime.partitions()).thenReturn(Sets.newSet(
+ new TopicPartition("__consumer_offsets", 0),
+ new TopicPartition("__consumer_offsets", 1),
+ new TopicPartition("__consumer_offsets", 2)
+ ));
+ for (int i = 0; i < 2; i++) {
+ when(runtime.scheduleReadOperation(
+ ArgumentMatchers.eq("list-groups"),
+ ArgumentMatchers.eq(new TopicPartition("__consumer_offsets",
i)),
+ ArgumentMatchers.any()
+ )).thenReturn(new CompletableFuture<>());
+ }
+
+ when(runtime.scheduleReadOperation(
+ ArgumentMatchers.eq("list-groups"),
+ ArgumentMatchers.eq(new TopicPartition("__consumer_offsets", 2)),
+ ArgumentMatchers.any()
+ )).thenReturn(FutureUtils.failedFuture(new
CoordinatorLoadInProgressException("")));
+
+ CompletableFuture<ListGroupsResponseData> responseFuture =
service.listGroups(
+ requestContext(ApiKeys.LIST_GROUPS),
+ request
+ );
+ ListGroupsResponseData listGroupsResponseData = responseFuture.get(5,
TimeUnit.SECONDS);
+
+ assertEquals(Errors.COORDINATOR_LOAD_IN_PROGRESS.code(),
listGroupsResponseData.errorCode());
+ assertEquals(Collections.emptyList(), listGroupsResponseData.groups());
+ }
+
@ParameterizedTest
@ValueSource(booleans = {true, false})
public void testFetchOffsets(
diff --git
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java
index 3a735db37e3..a1b25575e8e 100644
---
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java
+++
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java
@@ -41,6 +41,7 @@ import
org.apache.kafka.common.message.JoinGroupResponseData.JoinGroupResponseMe
import org.apache.kafka.common.message.LeaveGroupRequestData;
import org.apache.kafka.common.message.LeaveGroupRequestData.MemberIdentity;
import org.apache.kafka.common.message.LeaveGroupResponseData;
+import org.apache.kafka.common.message.ListGroupsResponseData;
import org.apache.kafka.common.message.SyncGroupRequestData;
import
org.apache.kafka.common.message.SyncGroupRequestData.SyncGroupRequestAssignment;
import org.apache.kafka.common.message.SyncGroupResponseData;
@@ -115,8 +116,10 @@ import java.util.Optional;
import java.util.Set;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.CompletableFuture;
+import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
+import java.util.stream.Stream;
import static org.apache.kafka.common.utils.Utils.mkSet;
import static
org.apache.kafka.common.message.JoinGroupRequestData.JoinGroupRequestProtocol;
@@ -455,8 +458,6 @@ public class GroupMetadataManagerTest {
public CoordinatorResult<ConsumerGroupHeartbeatResponseData, Record>
consumerGroupHeartbeat(
ConsumerGroupHeartbeatRequestData request
) {
- snapshotRegistry.getOrCreateSnapshot(lastCommittedOffset);
-
RequestContext context = new RequestContext(
new RequestHeader(
ApiKeys.CONSUMER_GROUP_HEARTBEAT,
@@ -1023,6 +1024,10 @@ public class GroupMetadataManagerTest {
);
}
+ public List<ListGroupsResponseData.ListedGroup>
sendListGroups(List<String> statesFilter) {
+ return groupMetadataManager.listGroups(statesFilter,
lastCommittedOffset);
+ }
+
public void verifyHeartbeat(
String groupId,
JoinGroupResponseData joinResponse,
@@ -1202,6 +1207,7 @@ public class GroupMetadataManagerTest {
}
lastWrittenOffset++;
+ snapshotRegistry.getOrCreateSnapshot(lastWrittenOffset);
}
}
@@ -8603,6 +8609,83 @@ public class GroupMetadataManagerTest {
assertEquals(Errors.REBALANCE_IN_PROGRESS.code(),
heartbeatResponse.errorCode());
}
+ @Test
+ public void testListGroups() {
+ String consumerGroupId = "consumer-group-id";
+ String genericGroupId = "generic-group-id";
+ String memberId1 = Uuid.randomUuid().toString();
+ String genericGroupType = "generic";
+ Uuid fooTopicId = Uuid.randomUuid();
+ String fooTopicName = "foo";
+
+ MockPartitionAssignor assignor = new MockPartitionAssignor("range");
+ GroupMetadataManagerTestContext context = new
GroupMetadataManagerTestContext.Builder()
+ .withAssignors(Collections.singletonList(assignor))
+ .withConsumerGroup(new ConsumerGroupBuilder(consumerGroupId, 10))
+ .build();
+ context.replay(newGroupMetadataRecord(
+ genericGroupId,
+ new GroupMetadataValue()
+ .setMembers(Collections.emptyList())
+ .setGeneration(2)
+ .setLeader(null)
+ .setProtocolType(genericGroupType)
+ .setProtocol("range")
+ .setCurrentStateTimestamp(context.time.milliseconds()),
+ MetadataVersion.latest()));
+ context.commit();
+ GenericGroup genericGroup =
context.groupMetadataManager.getOrMaybeCreateGenericGroup(genericGroupId,
false);
+
context.replay(RecordHelpers.newMemberSubscriptionRecord(consumerGroupId, new
ConsumerGroupMember.Builder(memberId1)
+ .setSubscribedTopicNames(Collections.singletonList(fooTopicName))
+ .build()));
+ context.replay(RecordHelpers.newGroupEpochRecord(consumerGroupId, 11));
+
+ Map<String, ListGroupsResponseData.ListedGroup> actualAllGroupMap =
+ context.sendListGroups(Collections.emptyList())
+
.stream().collect(Collectors.toMap(ListGroupsResponseData.ListedGroup::groupId,
Function.identity()));
+ Map<String, ListGroupsResponseData.ListedGroup> expectAllGroupMap =
+ Stream.of(
+ new ListGroupsResponseData.ListedGroup()
+ .setGroupId(genericGroup.groupId())
+ .setProtocolType(genericGroupType)
+ .setGroupState(EMPTY.toString()),
+ new ListGroupsResponseData.ListedGroup()
+ .setGroupId(consumerGroupId)
+ .setProtocolType(ConsumerProtocol.PROTOCOL_TYPE)
+
.setGroupState(ConsumerGroup.ConsumerGroupState.EMPTY.toString())
+
).collect(Collectors.toMap(ListGroupsResponseData.ListedGroup::groupId,
Function.identity()));
+
+ assertEquals(expectAllGroupMap, actualAllGroupMap);
+
+ context.commit();
+ actualAllGroupMap =
context.sendListGroups(Collections.emptyList()).stream()
+
.collect(Collectors.toMap(ListGroupsResponseData.ListedGroup::groupId,
Function.identity()));
+ expectAllGroupMap =
+ Stream.of(
+ new ListGroupsResponseData.ListedGroup()
+ .setGroupId(genericGroup.groupId())
+ .setProtocolType(genericGroupType)
+ .setGroupState(EMPTY.toString()),
+ new ListGroupsResponseData.ListedGroup()
+ .setGroupId(consumerGroupId)
+ .setProtocolType(ConsumerProtocol.PROTOCOL_TYPE)
+
.setGroupState(ConsumerGroup.ConsumerGroupState.ASSIGNING.toString())
+
).collect(Collectors.toMap(ListGroupsResponseData.ListedGroup::groupId,
Function.identity()));
+
+ assertEquals(expectAllGroupMap, actualAllGroupMap);
+
+ actualAllGroupMap =
context.sendListGroups(Collections.singletonList("Empty")).stream()
+
.collect(Collectors.toMap(ListGroupsResponseData.ListedGroup::groupId,
Function.identity()));
+ expectAllGroupMap = Stream.of(
+ new ListGroupsResponseData.ListedGroup()
+ .setGroupId(genericGroup.groupId())
+ .setProtocolType(genericGroupType)
+ .setGroupState(EMPTY.toString())
+
).collect(Collectors.toMap(ListGroupsResponseData.ListedGroup::groupId,
Function.identity()));
+
+ assertEquals(expectAllGroupMap, actualAllGroupMap);
+ }
+
public static <T> void assertUnorderedListEquals(
List<T> expected,
List<T> actual
diff --git
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/consumer/ConsumerGroupTest.java
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/consumer/ConsumerGroupTest.java
index aa848aa3f5d..9c9421958c5 100644
---
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/consumer/ConsumerGroupTest.java
+++
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/consumer/ConsumerGroupTest.java
@@ -632,6 +632,20 @@ public class ConsumerGroupTest {
group.validateOffsetCommit("member-id", "", 0);
}
+ @Test
+ public void testAsListedGroup() {
+ SnapshotRegistry snapshotRegistry = new SnapshotRegistry(new
LogContext());
+ ConsumerGroup group = new ConsumerGroup(snapshotRegistry, "group-foo");
+ snapshotRegistry.getOrCreateSnapshot(0);
+ assertEquals(ConsumerGroup.ConsumerGroupState.EMPTY.toString(),
group.stateAsString(0));
+ group.updateMember(new ConsumerGroupMember.Builder("member1")
+ .setSubscribedTopicNames(Collections.singletonList("foo"))
+ .build());
+ snapshotRegistry.getOrCreateSnapshot(1);
+ assertEquals(ConsumerGroup.ConsumerGroupState.EMPTY.toString(),
group.stateAsString(0));
+ assertEquals(ConsumerGroup.ConsumerGroupState.STABLE.toString(),
group.stateAsString(1));
+ }
+
@Test
public void testValidateOffsetFetch() {
SnapshotRegistry snapshotRegistry = new SnapshotRegistry(new
LogContext());