This is an automated email from the ASF dual-hosted git repository. jgus pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push: new 16ad358 KAFKA-6868; Fix buffer underflow and expose group state in the consumer groups API (#4980) 16ad358 is described below commit 16ad358d64a138fc4b455379745ae1550a93d57b Author: Colin Patrick McCabe <co...@cmccabe.xyz> AuthorDate: Mon May 21 08:37:35 2018 -0700 KAFKA-6868; Fix buffer underflow and expose group state in the consumer groups API (#4980) * The consumer groups API should expose group state and coordinator information. This information is needed by administrative tools and scripts that access consume groups. * The partition assignment will be empty when the group is rebalancing. Fix an issue where the adminclient attempted to deserialize this empty buffer. * Remove nulls from the API and make all collections immutable. * DescribeConsumerGroupsResult#all should return a result as expected, rather than Void * Fix exception text for GroupIdNotFoundException, GroupNotEmptyException. It was being filled in as "The group id The group id does not exist was not found" and similar. Reviewers: Attila Sasvari <asasv...@apache.org>, Andras Beni <andrasb...@cloudera.com>, Dong Lin <lindon...@gmail.com>, Jason Gustafson <ja...@confluent.io> --- .../clients/admin/ConsumerGroupDescription.java | 78 +++++++++----- .../admin/DescribeConsumerGroupsResult.java | 26 ++++- .../kafka/clients/admin/KafkaAdminClient.java | 51 +++++---- .../kafka/clients/admin/MemberAssignment.java | 13 ++- .../kafka/clients/admin/MemberDescription.java | 45 ++++---- .../apache/kafka/common/ConsumerGroupState.java | 61 +++++++++++ .../common/errors/GroupIdNotFoundException.java | 12 +-- .../common/errors/GroupNotEmptyException.java | 12 +-- .../main/scala/kafka/tools/StreamsResetter.java | 3 +- .../kafka/api/AdminClientIntegrationTest.scala | 120 ++++++++++++++++++++- 10 files changed, 315 insertions(+), 106 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/ConsumerGroupDescription.java b/clients/src/main/java/org/apache/kafka/clients/admin/ConsumerGroupDescription.java index 0bfa8a7..bc3857d 100644 --- a/clients/src/main/java/org/apache/kafka/clients/admin/ConsumerGroupDescription.java +++ b/clients/src/main/java/org/apache/kafka/clients/admin/ConsumerGroupDescription.java @@ -17,55 +17,56 @@ package org.apache.kafka.clients.admin; +import org.apache.kafka.common.ConsumerGroupState; +import org.apache.kafka.common.Node; import org.apache.kafka.common.utils.Utils; -import java.util.List; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.Objects; /** * A detailed description of a single consumer group in the cluster. */ public class ConsumerGroupDescription { - private final String groupId; private final boolean isSimpleConsumerGroup; - private final List<MemberDescription> members; + private final Collection<MemberDescription> members; private final String partitionAssignor; + private final ConsumerGroupState state; + private final Node coordinator; - /** - * Creates an instance with the specified parameters. - * - * @param groupId The consumer group id - * @param isSimpleConsumerGroup If Consumer Group is simple - * @param members The consumer group members - * @param partitionAssignor The consumer group partition assignor - */ - public ConsumerGroupDescription(String groupId, boolean isSimpleConsumerGroup, List<MemberDescription> members, String partitionAssignor) { - this.groupId = groupId; + ConsumerGroupDescription(String groupId, + boolean isSimpleConsumerGroup, + Collection<MemberDescription> members, + String partitionAssignor, + ConsumerGroupState state, + Node coordinator) { + this.groupId = groupId == null ? "" : groupId; this.isSimpleConsumerGroup = isSimpleConsumerGroup; - this.members = members; - this.partitionAssignor = partitionAssignor; + this.members = members == null ? Collections.<MemberDescription>emptyList() : + Collections.unmodifiableList(new ArrayList<>(members)); + this.partitionAssignor = partitionAssignor == null ? "" : partitionAssignor; + this.state = state; + this.coordinator = coordinator; } @Override public boolean equals(Object o) { if (this == o) return true; if (o == null || getClass() != o.getClass()) return false; - ConsumerGroupDescription that = (ConsumerGroupDescription) o; - - if (isSimpleConsumerGroup != that.isSimpleConsumerGroup) return false; - if (groupId != null ? !groupId.equals(that.groupId) : that.groupId != null) return false; - if (members != null ? !members.equals(that.members) : that.members != null) return false; - return partitionAssignor != null ? partitionAssignor.equals(that.partitionAssignor) : that.partitionAssignor == null; + return isSimpleConsumerGroup == that.isSimpleConsumerGroup && + groupId.equals(that.groupId) && + members.equals(that.members) && + partitionAssignor.equals(that.partitionAssignor) && + state.equals(that.state); } @Override public int hashCode() { - int result = groupId != null ? groupId.hashCode() : 0; - result = 31 * result + (isSimpleConsumerGroup ? 1 : 0); - result = 31 * result + (members != null ? members.hashCode() : 0); - result = 31 * result + (partitionAssignor != null ? partitionAssignor.hashCode() : 0); - return result; + return Objects.hash(isSimpleConsumerGroup, groupId, members, partitionAssignor, state); } /** @@ -85,7 +86,7 @@ public class ConsumerGroupDescription { /** * A list of the members of the consumer group. */ - public List<MemberDescription> members() { + public Collection<MemberDescription> members() { return members; } @@ -96,9 +97,28 @@ public class ConsumerGroupDescription { return partitionAssignor; } + /** + * The consumer group state, or UNKNOWN if the state is too new for us to parse. + */ + public ConsumerGroupState state() { + return state; + } + + /** + * The consumer group coordinator, or null if the coordinator is not known. + */ + public Node coordinator() { + return coordinator; + } + @Override public String toString() { - return "(groupId=" + groupId + ", isSimpleConsumerGroup=" + isSimpleConsumerGroup + ", members=" + - Utils.join(members, ",") + ", partitionAssignor=" + partitionAssignor + ")"; + return "(groupId=" + groupId + + ", isSimpleConsumerGroup=" + isSimpleConsumerGroup + + ", members=" + Utils.join(members, ",") + + ", partitionAssignor=" + partitionAssignor + + ", state=" + state + + ", coordinator=" + coordinator + + ")"; } } diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/DescribeConsumerGroupsResult.java b/clients/src/main/java/org/apache/kafka/clients/admin/DescribeConsumerGroupsResult.java index ac2189c..8f0ebad 100644 --- a/clients/src/main/java/org/apache/kafka/clients/admin/DescribeConsumerGroupsResult.java +++ b/clients/src/main/java/org/apache/kafka/clients/admin/DescribeConsumerGroupsResult.java @@ -21,7 +21,9 @@ import org.apache.kafka.common.KafkaFuture; import org.apache.kafka.common.annotation.InterfaceStability; import java.util.Collection; +import java.util.HashMap; import java.util.Map; +import java.util.concurrent.ExecutionException; /** @@ -39,16 +41,32 @@ public class DescribeConsumerGroupsResult { } /** - * Return a map from group id to futures which can be used to check the description of a consumer group. + * Return a map from group id to futures which yield group descriptions. */ public Map<String, KafkaFuture<ConsumerGroupDescription>> describedGroups() { return futures; } /** - * Return a future which succeeds only if all the consumer group description succeed. + * Return a future which yields all ConsumerGroupDescription objects, if all the describes succeed. */ - public KafkaFuture<Void> all() { - return KafkaFuture.allOf(futures.values().toArray(new KafkaFuture[0])); + public KafkaFuture<Map<String, ConsumerGroupDescription>> all() { + return KafkaFuture.allOf(futures.values().toArray(new KafkaFuture[0])).thenApply( + new KafkaFuture.BaseFunction<Void, Map<String, ConsumerGroupDescription>>() { + @Override + public Map<String, ConsumerGroupDescription> apply(Void v) { + try { + Map<String, ConsumerGroupDescription> descriptions = new HashMap<>(futures.size()); + for (Map.Entry<String, KafkaFuture<ConsumerGroupDescription>> entry : futures.entrySet()) { + descriptions.put(entry.getKey(), entry.getValue().get()); + } + return descriptions; + } catch (InterruptedException | ExecutionException e) { + // This should be unreachable, since the KafkaFuture#allOf already ensured + // that all of the futures completed successfully. + throw new RuntimeException(e); + } + } + }); } } diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java b/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java index c9e0e18..5f4eefe 100644 --- a/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java +++ b/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java @@ -31,6 +31,7 @@ import org.apache.kafka.clients.consumer.OffsetAndMetadata; import org.apache.kafka.clients.consumer.internals.ConsumerProtocol; import org.apache.kafka.clients.consumer.internals.PartitionAssignor; import org.apache.kafka.common.Cluster; +import org.apache.kafka.common.ConsumerGroupState; import org.apache.kafka.common.KafkaException; import org.apache.kafka.common.KafkaFuture; import org.apache.kafka.common.Node; @@ -120,11 +121,9 @@ import org.apache.kafka.common.utils.AppInfoParser; import org.apache.kafka.common.utils.KafkaThread; import org.apache.kafka.common.utils.LogContext; import org.apache.kafka.common.utils.Time; -import org.apache.kafka.common.utils.Utils; import org.slf4j.Logger; import java.net.InetSocketAddress; -import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; @@ -2347,14 +2346,21 @@ public class KafkaAdminClient extends AdminClient { @Override void handleResponse(AbstractResponse abstractResponse) { - final FindCoordinatorResponse response = (FindCoordinatorResponse) abstractResponse; + final FindCoordinatorResponse fcResponse = (FindCoordinatorResponse) abstractResponse; + Errors error = fcResponse.error(); + if (error == Errors.COORDINATOR_NOT_AVAILABLE) { + // Retry COORDINATOR_NOT_AVAILABLE, in case the error is temporary. + throw error.exception(); + } else if (error != Errors.NONE) { + // All other errors are immediate failures. + KafkaFutureImpl<ConsumerGroupDescription> future = futures.get(groupId); + future.completeExceptionally(error.exception()); + return; + } final long nowDescribeConsumerGroups = time.milliseconds(); - - final int nodeId = response.node().id(); - + final int nodeId = fcResponse.node().id(); runnable.call(new Call("describeConsumerGroups", deadline, new ConstantNodeIdProvider(nodeId)) { - @Override AbstractRequest.Builder createRequest(int timeoutMs) { return new DescribeGroupsRequest.Builder(Collections.singletonList(groupId)); @@ -2375,24 +2381,29 @@ public class KafkaAdminClient extends AdminClient { final String protocolType = groupMetadata.protocolType(); if (protocolType.equals(ConsumerProtocol.PROTOCOL_TYPE) || protocolType.isEmpty()) { final List<DescribeGroupsResponse.GroupMember> members = groupMetadata.members(); - final List<MemberDescription> consumers = new ArrayList<>(members.size()); + final List<MemberDescription> memberDescriptions = new ArrayList<>(members.size()); for (DescribeGroupsResponse.GroupMember groupMember : members) { - final PartitionAssignor.Assignment assignment = - ConsumerProtocol.deserializeAssignment( - ByteBuffer.wrap(Utils.readBytes(groupMember.memberAssignment()))); - + Set<TopicPartition> partitions = Collections.emptySet(); + if (groupMember.memberAssignment().remaining() > 0) { + final PartitionAssignor.Assignment assignment = ConsumerProtocol. + deserializeAssignment(groupMember.memberAssignment().duplicate()); + partitions = new HashSet<>(assignment.partitions()); + } final MemberDescription memberDescription = - new MemberDescription( - groupMember.memberId(), - groupMember.clientId(), - groupMember.clientHost(), - new MemberAssignment(assignment.partitions())); - consumers.add(memberDescription); + new MemberDescription(groupMember.memberId(), + groupMember.clientId(), + groupMember.clientHost(), + new MemberAssignment(partitions)); + memberDescriptions.add(memberDescription); } - final String protocol = groupMetadata.protocol(); final ConsumerGroupDescription consumerGroupDescription = - new ConsumerGroupDescription(groupId, protocolType.isEmpty(), consumers, protocol); + new ConsumerGroupDescription(groupId, + protocolType.isEmpty(), + memberDescriptions, + groupMetadata.protocol(), + ConsumerGroupState.parse(groupMetadata.state()), + fcResponse.node()); future.complete(consumerGroupDescription); } } diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/MemberAssignment.java b/clients/src/main/java/org/apache/kafka/clients/admin/MemberAssignment.java index bd95813..6c180ad 100644 --- a/clients/src/main/java/org/apache/kafka/clients/admin/MemberAssignment.java +++ b/clients/src/main/java/org/apache/kafka/clients/admin/MemberAssignment.java @@ -19,21 +19,24 @@ package org.apache.kafka.clients.admin; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.utils.Utils; -import java.util.List; +import java.util.Collections; +import java.util.HashSet; +import java.util.Set; /** * A description of the assignments of a specific group member. */ public class MemberAssignment { - private final List<TopicPartition> topicPartitions; + private final Set<TopicPartition> topicPartitions; /** * Creates an instance with the specified parameters. * * @param topicPartitions List of topic partitions */ - public MemberAssignment(List<TopicPartition> topicPartitions) { - this.topicPartitions = topicPartitions; + MemberAssignment(Set<TopicPartition> topicPartitions) { + this.topicPartitions = topicPartitions == null ? Collections.<TopicPartition>emptySet() : + Collections.unmodifiableSet(new HashSet<>(topicPartitions)); } @Override @@ -54,7 +57,7 @@ public class MemberAssignment { /** * The topic partitions assigned to a group member. */ - public List<TopicPartition> topicPartitions() { + public Set<TopicPartition> topicPartitions() { return topicPartitions; } diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/MemberDescription.java b/clients/src/main/java/org/apache/kafka/clients/admin/MemberDescription.java index 2ba1963..895abad 100644 --- a/clients/src/main/java/org/apache/kafka/clients/admin/MemberDescription.java +++ b/clients/src/main/java/org/apache/kafka/clients/admin/MemberDescription.java @@ -17,49 +17,42 @@ package org.apache.kafka.clients.admin; +import org.apache.kafka.common.TopicPartition; + +import java.util.Collections; +import java.util.Objects; + /** * A detailed description of a single group instance in the cluster. */ public class MemberDescription { - private final String memberId; private final String clientId; private final String host; private final MemberAssignment assignment; - /** - * Creates an instance with the specified parameters. - * - * @param memberId The consumer id - * @param clientId The client id - * @param host The host - * @param assignment The assignment - */ - public MemberDescription(String memberId, String clientId, String host, MemberAssignment assignment) { - this.memberId = memberId; - this.clientId = clientId; - this.host = host; - this.assignment = assignment; + MemberDescription(String memberId, String clientId, String host, MemberAssignment assignment) { + this.memberId = memberId == null ? "" : memberId; + this.clientId = clientId == null ? "" : clientId; + this.host = host == null ? "" : host; + this.assignment = assignment == null ? + new MemberAssignment(Collections.<TopicPartition>emptySet()) : assignment; } @Override public boolean equals(Object o) { if (this == o) return true; if (o == null || getClass() != o.getClass()) return false; - MemberDescription that = (MemberDescription) o; - - if (memberId != null ? !memberId.equals(that.memberId) : that.memberId != null) return false; - if (clientId != null ? !clientId.equals(that.clientId) : that.clientId != null) return false; - return assignment != null ? assignment.equals(that.assignment) : that.assignment == null; + return memberId.equals(that.memberId) && + clientId.equals(that.clientId) && + host.equals(that.host) && + assignment.equals(that.assignment); } @Override public int hashCode() { - int result = memberId != null ? memberId.hashCode() : 0; - result = 31 * result + (clientId != null ? clientId.hashCode() : 0); - result = 31 * result + (assignment != null ? assignment.hashCode() : 0); - return result; + return Objects.hash(memberId, clientId, host, assignment); } /** @@ -92,7 +85,9 @@ public class MemberDescription { @Override public String toString() { - return "(memberId=" + memberId + ", clientId=" + clientId + ", host=" + host + ", assignment=" + - assignment + ")"; + return "(memberId=" + memberId + + ", clientId=" + clientId + + ", host=" + host + + ", assignment=" + assignment + ")"; } } diff --git a/clients/src/main/java/org/apache/kafka/common/ConsumerGroupState.java b/clients/src/main/java/org/apache/kafka/common/ConsumerGroupState.java new file mode 100644 index 0000000..7f3d4f0 --- /dev/null +++ b/clients/src/main/java/org/apache/kafka/common/ConsumerGroupState.java @@ -0,0 +1,61 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.common; + +import java.util.HashMap; + +/** + * The consumer group state. + */ +public enum ConsumerGroupState { + UNKNOWN("Unknown"), + PREPARING_REBALANCE("PreparingRebalance"), + COMPLETING_REBALANCE("CompletingRebalance"), + STABLE("Stable"), + DEAD("Dead"), + EMPTY("Empty"); + + private final static HashMap<String, ConsumerGroupState> NAME_TO_ENUM; + + static { + NAME_TO_ENUM = new HashMap<>(); + for (ConsumerGroupState state : ConsumerGroupState.values()) { + NAME_TO_ENUM.put(state.name, state); + } + } + + private final String name; + + ConsumerGroupState(String name) { + this.name = name; + } + + + /** + * Parse a string into a consumer group state. + */ + public static ConsumerGroupState parse(String name) { + ConsumerGroupState state = NAME_TO_ENUM.get(name); + return state == null ? UNKNOWN : state; + } + + @Override + public String toString() { + return name; + } +} diff --git a/clients/src/main/java/org/apache/kafka/common/errors/GroupIdNotFoundException.java b/clients/src/main/java/org/apache/kafka/common/errors/GroupIdNotFoundException.java index 1ff30f1..a4d509d 100644 --- a/clients/src/main/java/org/apache/kafka/common/errors/GroupIdNotFoundException.java +++ b/clients/src/main/java/org/apache/kafka/common/errors/GroupIdNotFoundException.java @@ -17,15 +17,7 @@ package org.apache.kafka.common.errors; public class GroupIdNotFoundException extends ApiException { - private final String groupId; - - public GroupIdNotFoundException(String groupId) { - super("The group id " + groupId + " was not found"); - this.groupId = groupId; - } - - public String groupId() { - return groupId; + public GroupIdNotFoundException(String message) { + super(message); } - } diff --git a/clients/src/main/java/org/apache/kafka/common/errors/GroupNotEmptyException.java b/clients/src/main/java/org/apache/kafka/common/errors/GroupNotEmptyException.java index 264e613..e15b3e6 100644 --- a/clients/src/main/java/org/apache/kafka/common/errors/GroupNotEmptyException.java +++ b/clients/src/main/java/org/apache/kafka/common/errors/GroupNotEmptyException.java @@ -17,15 +17,7 @@ package org.apache.kafka.common.errors; public class GroupNotEmptyException extends ApiException { - private final String groupId; - - public GroupNotEmptyException(String groupId) { - super("The group " + groupId + " is not empty"); - this.groupId = groupId; - } - - public String groupId() { - return groupId; + public GroupNotEmptyException(String message) { + super(message); } - } diff --git a/core/src/main/scala/kafka/tools/StreamsResetter.java b/core/src/main/scala/kafka/tools/StreamsResetter.java index d7c4e43..3c045c6 100644 --- a/core/src/main/scala/kafka/tools/StreamsResetter.java +++ b/core/src/main/scala/kafka/tools/StreamsResetter.java @@ -157,7 +157,8 @@ public class StreamsResetter { final AdminClient adminClient) throws ExecutionException, InterruptedException { final DescribeConsumerGroupsResult describeResult = adminClient.describeConsumerGroups(Arrays.asList(groupId), (new DescribeConsumerGroupsOptions()).timeoutMs(10 * 1000)); - final List<MemberDescription> members = describeResult.describedGroups().get(groupId).get().members(); + final List<MemberDescription> members = + new ArrayList<MemberDescription>(describeResult.describedGroups().get(groupId).get().members()); if (!members.isEmpty()) { throw new IllegalStateException("Consumer group '" + groupId + "' is still active " + "and has following members: " + members + ". " diff --git a/core/src/test/scala/integration/kafka/api/AdminClientIntegrationTest.scala b/core/src/test/scala/integration/kafka/api/AdminClientIntegrationTest.scala index b31c09d..e7dd108 100644 --- a/core/src/test/scala/integration/kafka/api/AdminClientIntegrationTest.scala +++ b/core/src/test/scala/integration/kafka/api/AdminClientIntegrationTest.scala @@ -31,10 +31,10 @@ import org.apache.kafka.clients.admin._ import kafka.utils.{Logging, TestUtils} import kafka.utils.Implicits._ import org.apache.kafka.clients.admin.NewTopic -import org.apache.kafka.clients.consumer.KafkaConsumer +import org.apache.kafka.clients.consumer.{ConsumerConfig, KafkaConsumer} import org.apache.kafka.clients.producer.KafkaProducer import org.apache.kafka.clients.producer.ProducerRecord -import org.apache.kafka.common.{KafkaFuture, TopicPartition, TopicPartitionReplica} +import org.apache.kafka.common.{ConsumerGroupState, KafkaFuture, TopicPartition, TopicPartitionReplica} import org.apache.kafka.common.acl._ import org.apache.kafka.common.config.ConfigResource import org.apache.kafka.common.errors._ @@ -49,6 +49,7 @@ import scala.collection.JavaConverters._ import java.lang.{Long => JLong} import kafka.zk.KafkaZkClient +import org.apache.kafka.common.internals.Topic import org.scalatest.Assertions.intercept import scala.concurrent.duration.Duration @@ -98,6 +99,7 @@ class AdminClientIntegrationTest extends IntegrationTestHarness with Logging { config.setProperty(KafkaConfig.InterBrokerListenerNameProp, listenerName.value) config.setProperty(KafkaConfig.ListenerSecurityProtocolMapProp, s"${listenerName.value}:${securityProtocol.name}") config.setProperty(KafkaConfig.DeleteTopicEnableProp, "true") + config.setProperty(KafkaConfig.GroupInitialRebalanceDelayMsProp, "0") // We set this in order to test that we don't expose sensitive data via describe configs. This will already be // set for subclasses with security enabled and we don't want to overwrite it. if (!config.containsKey(KafkaConfig.SslTruststorePasswordProp)) @@ -959,6 +961,120 @@ class AdminClientIntegrationTest extends IntegrationTestHarness with Logging { client.close() assertEquals(1, factory.failuresInjected) } + + /** + * Test the consumer group APIs. + */ + @Test + def testConsumerGroups(): Unit = { + val config = createConfig() + val client = AdminClient.create(config) + try { + // Verify that initially there are no consumer groups to list. + val list1 = client.listConsumerGroups() + assertTrue(0 == list1.all().get().size()) + assertTrue(0 == list1.errors().get().size()) + assertTrue(0 == list1.valid().get().size()) + val testTopicName = "test_topic" + val testNumPartitions = 2 + client.createTopics(Collections.singleton( + new NewTopic(testTopicName, testNumPartitions, 1))).all().get() + val producer = createNewProducer + try { + producer.send(new ProducerRecord(testTopicName, 0, null, null)).get() + } finally { + Utils.closeQuietly(producer, "producer") + } + val testGroupId = "test_group_id" + val testClientId = "test_client_id" + val fakeGroupId = "fake_group_id" + val newConsumerConfig = new Properties(consumerConfig) + newConsumerConfig.setProperty(ConsumerConfig.GROUP_ID_CONFIG, testGroupId) + newConsumerConfig.setProperty(ConsumerConfig.CLIENT_ID_CONFIG, testClientId) + val consumer = TestUtils.createNewConsumer(brokerList, + securityProtocol = this.securityProtocol, + trustStoreFile = this.trustStoreFile, + saslProperties = this.clientSaslProperties, + props = Some(newConsumerConfig)) + try { + // Start a consumer in a thread that will subscribe to a new group. + val consumerThread = new Thread { + override def run { + consumer.subscribe(Collections.singleton(testTopicName)) + while (true) { + consumer.poll(5000) + consumer.commitSync() + } + } + } + try { + consumerThread.start + // Test that we can list the new group. + TestUtils.waitUntilTrue(() => { + val matching = client.listConsumerGroups().all().get().asScala. + filter(listing => listing.groupId().equals(testGroupId)) + !matching.isEmpty + }, s"Expected to be able to list $testGroupId") + + val result = client.describeConsumerGroups(Seq(testGroupId, fakeGroupId).asJava) + assertEquals(2, result.describedGroups().size()) + + // Test that we can get information about the test consumer group. + assertTrue(result.describedGroups().containsKey(testGroupId)) + val testGroupDescription = result.describedGroups().get(testGroupId).get() + assertEquals(testGroupId, testGroupDescription.groupId()) + assertFalse(testGroupDescription.isSimpleConsumerGroup()) + assertEquals(1, testGroupDescription.members().size()) + val member = testGroupDescription.members().iterator().next() + assertEquals(testClientId, member.clientId()) + val topicPartitions = member.assignment().topicPartitions() + assertEquals(testNumPartitions, topicPartitions.size()) + assertEquals(testNumPartitions, topicPartitions.asScala. + count(tp => tp.topic().equals(testTopicName))) + + // Test that the fake group is listed as dead. + assertTrue(result.describedGroups().containsKey(fakeGroupId)) + val fakeGroupDescription = result.describedGroups().get(fakeGroupId).get() + assertEquals(fakeGroupId, fakeGroupDescription.groupId()) + assertEquals(0, fakeGroupDescription.members().size()) + assertEquals("", fakeGroupDescription.partitionAssignor()) + assertEquals(ConsumerGroupState.DEAD, fakeGroupDescription.state()) + + // Test that all() returns 2 results + assertEquals(2, result.all().get().size()) + + // Test listConsumerGroupOffsets + val parts = client.listConsumerGroupOffsets(testGroupId).partitionsToOffsetAndMetadata().get() + TestUtils.waitUntilTrue(() => { + val parts = client.listConsumerGroupOffsets(testGroupId).partitionsToOffsetAndMetadata().get() + val part = new TopicPartition(testTopicName, 0) + parts.containsKey(part) && (parts.get(part).offset() == 1) + }, s"Expected the offset for partition 0 to eventually become 1.") + + // Test consumer group deletion + val deleteResult = client.deleteConsumerGroups(Seq(testGroupId, fakeGroupId).asJava) + assertEquals(2, deleteResult.deletedGroups().size()) + + // Deleting the fake group ID should get GroupIdNotFoundException. + assertTrue(deleteResult.deletedGroups().containsKey(fakeGroupId)) + assertFutureExceptionTypeEquals(deleteResult.deletedGroups().get(fakeGroupId), + classOf[GroupIdNotFoundException]) + + // Deleting the real group ID should get GroupNotEmptyException + assertTrue(deleteResult.deletedGroups().containsKey(testGroupId)) + assertFutureExceptionTypeEquals(deleteResult.deletedGroups().get(testGroupId), + classOf[GroupNotEmptyException]) + } finally { + consumerThread.interrupt() + consumerThread.join() + } + } finally { + Utils.closeQuietly(consumer, "consumer") + } + } finally { + Utils.closeQuietly(client, "adminClient") + } + } } object AdminClientIntegrationTest { -- To stop receiving notification emails like this one, please contact j...@apache.org.