This is an automated email from the ASF dual-hosted git repository. guozhang pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push: new 6eb7cf1 KAFKA-5965: Remove Deprecated AdminClient from Streams Resetter Tool (#4968) 6eb7cf1 is described below commit 6eb7cf1300fc0c411ffab93de041654bc10918bf Author: fedosov-alexander <alexander.s.fedo...@yandex.ru> AuthorDate: Fri May 11 18:44:27 2018 +0300 KAFKA-5965: Remove Deprecated AdminClient from Streams Resetter Tool (#4968) Removed usage of deprecated AdminClient from StreamsResetter No additional tests are required. Matthias J. Sax <matth...@confluent.io>, Guozhang Wang <wangg...@gmail.com> --- .../main/scala/kafka/tools/StreamsResetter.java | 27 +++++++++++----------- 1 file changed, 14 insertions(+), 13 deletions(-) diff --git a/core/src/main/scala/kafka/tools/StreamsResetter.java b/core/src/main/scala/kafka/tools/StreamsResetter.java index b0d5276..d7c4e43 100644 --- a/core/src/main/scala/kafka/tools/StreamsResetter.java +++ b/core/src/main/scala/kafka/tools/StreamsResetter.java @@ -23,8 +23,11 @@ import joptsimple.OptionSpec; import joptsimple.OptionSpecBuilder; import kafka.utils.CommandLineUtils; import org.apache.kafka.clients.admin.AdminClient; +import org.apache.kafka.clients.admin.MemberDescription; import org.apache.kafka.clients.admin.DeleteTopicsResult; import org.apache.kafka.clients.admin.KafkaAdminClient; +import org.apache.kafka.clients.admin.DescribeConsumerGroupsOptions; +import org.apache.kafka.clients.admin.DescribeConsumerGroupsResult; import org.apache.kafka.clients.consumer.Consumer; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.consumer.KafkaConsumer; @@ -42,6 +45,7 @@ import javax.xml.datatype.Duration; import java.io.IOException; import java.text.ParseException; import java.text.SimpleDateFormat; +import java.util.Arrays; import java.util.ArrayList; import java.util.Date; import java.util.HashMap; @@ -51,6 +55,7 @@ import java.util.List; import java.util.Map; import java.util.Properties; import java.util.Set; +import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; /** @@ -120,8 +125,8 @@ public class StreamsResetter { } properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, options.valueOf(bootstrapServerOption)); - validateNoActiveConsumers(groupId, properties); kafkaAdminClient = (KafkaAdminClient) AdminClient.create(properties); + validateNoActiveConsumers(groupId, kafkaAdminClient); allTopics.clear(); allTopics.addAll(kafkaAdminClient.listTopics().names().get(60, TimeUnit.SECONDS)); @@ -149,18 +154,14 @@ public class StreamsResetter { } private void validateNoActiveConsumers(final String groupId, - final Properties properties) { - kafka.admin.AdminClient olderAdminClient = null; - try { - olderAdminClient = kafka.admin.AdminClient.create(properties); - if (!olderAdminClient.describeConsumerGroup(groupId, 0).consumers().get().isEmpty()) { - throw new IllegalStateException("Consumer group '" + groupId + "' is still active. " - + "Make sure to stop all running application instances before running the reset tool."); - } - } finally { - if (olderAdminClient != null) { - olderAdminClient.close(); - } + final AdminClient adminClient) throws ExecutionException, InterruptedException { + final DescribeConsumerGroupsResult describeResult = adminClient.describeConsumerGroups(Arrays.asList(groupId), + (new DescribeConsumerGroupsOptions()).timeoutMs(10 * 1000)); + final List<MemberDescription> members = describeResult.describedGroups().get(groupId).get().members(); + if (!members.isEmpty()) { + throw new IllegalStateException("Consumer group '" + groupId + "' is still active " + + "and has following members: " + members + ". " + + "Make sure to stop all running application instances before running the reset tool."); } } -- To stop receiving notification emails like this one, please contact guozh...@apache.org.