hachikuji commented on a change in pull request #8238:
URL: https://github.com/apache/kafka/pull/8238#discussion_r426867775
##########
File path: clients/src/main/resources/common/message/ListGroupsRequest.json
##########
@@ -20,8 +20,14 @@
// Version 1 and 2 are the same as version 0.
//
// Version 3 is the first flexible version.
- "validVersions": "0-3",
+ //
+ // Version 4 adds the States flexible field (KIP-518).
+ "validVersions": "0-4",
"flexibleVersions": "3+",
"fields": [
+ { "name": "States", "type": "[]string", "versions": "4+", "tag": 0,
"taggedVersions": "4+",
Review comment:
Sorry I missed this from the discussion, but why are we bumping the
version if we are only adding tagged fields? Is it so that we can detect
whether the capability is supported? If so, then I wonder why we don't make
this a regular field.
##########
File path:
clients/src/main/java/org/apache/kafka/clients/admin/ListConsumerGroupsOptions.java
##########
@@ -26,4 +31,34 @@
*/
@InterfaceStability.Evolving
public class ListConsumerGroupsOptions extends
AbstractOptions<ListConsumerGroupsOptions> {
+
+ private Optional<Set<ConsumerGroupState>> states = Optional.empty();
+
+ /**
+ * Only groups in these states will be returned by listConsumerGroups()
Review comment:
Probably worth adding a comment about broker compatibility with this API.
##########
File path:
clients/src/main/java/org/apache/kafka/clients/admin/ListConsumerGroupsOptions.java
##########
@@ -26,4 +31,34 @@
*/
@InterfaceStability.Evolving
public class ListConsumerGroupsOptions extends
AbstractOptions<ListConsumerGroupsOptions> {
+
+ private Optional<Set<ConsumerGroupState>> states = Optional.empty();
+
+ /**
+ * Only groups in these states will be returned by listConsumerGroups()
+ * If not set, all groups are returned without their states
+ * throw IllegalArgumentException if states is empty
+ */
+ public ListConsumerGroupsOptions inStates(Set<ConsumerGroupState> states) {
+ if (states == null || states.isEmpty()) {
+ throw new IllegalArgumentException("states should not be null or
empty");
+ }
+ this.states = Optional.of(states);
+ return this;
+ }
+
+ /**
+ * All groups with their states will be returned by listConsumerGroups()
+ */
+ public ListConsumerGroupsOptions inAnyState() {
+ this.states = Optional.of(EnumSet.allOf(ConsumerGroupState.class));
Review comment:
Hmm.. We have an `UNKNOWN` state in `ConsumerGroupState` in case the
group coordinator adds a new state that the client isn't aware of. Currently
we're going to pass this through the request, which is a bit odd. Furthermore,
if the coordinator _does_ add new states, we will be unable to see them using
this API. I think it might be better to use a `null` list of states in the
request to indicate that any state is needed.
##########
File path: core/src/main/scala/kafka/server/KafkaApis.scala
##########
@@ -1397,29 +1398,32 @@ class KafkaApis(val requestChannel: RequestChannel,
}
def handleListGroupsRequest(request: RequestChannel.Request): Unit = {
- val (error, groups) = groupCoordinator.handleListGroups()
+ val listGroupsRequest = request.body[ListGroupsRequest]
+ val states = listGroupsRequest.data.states.asScala.toList
+
+ def createResponse(throttleMs: Int, groups: List[GroupOverview], error:
Errors): AbstractResponse = {
+ new ListGroupsResponse(new ListGroupsResponseData()
+ .setErrorCode(error.code)
+ .setGroups(groups.map { group =>
+ val listedGroup = new ListGroupsResponseData.ListedGroup()
+ .setGroupId(group.groupId)
+ .setProtocolType(group.protocolType)
+ if (!states.isEmpty)
Review comment:
Why don't we always return the state? I don't think overhead is a huge
concern for an api like this.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]