[ https://issues.apache.org/jira/browse/KAFKA-5965?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16472135#comment-16472135 ]
ASF GitHub Bot commented on KAFKA-5965: --------------------------------------- guozhangwang closed pull request #4968: KAFKA-5965: Remove Deprecated AdminClient from Streams Resetter Tool URL: https://github.com/apache/kafka/pull/4968 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/core/src/main/scala/kafka/tools/StreamsResetter.java b/core/src/main/scala/kafka/tools/StreamsResetter.java index b0d52764b59..d7c4e435e58 100644 --- a/core/src/main/scala/kafka/tools/StreamsResetter.java +++ b/core/src/main/scala/kafka/tools/StreamsResetter.java @@ -23,8 +23,11 @@ import joptsimple.OptionSpecBuilder; import kafka.utils.CommandLineUtils; import org.apache.kafka.clients.admin.AdminClient; +import org.apache.kafka.clients.admin.MemberDescription; import org.apache.kafka.clients.admin.DeleteTopicsResult; import org.apache.kafka.clients.admin.KafkaAdminClient; +import org.apache.kafka.clients.admin.DescribeConsumerGroupsOptions; +import org.apache.kafka.clients.admin.DescribeConsumerGroupsResult; import org.apache.kafka.clients.consumer.Consumer; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.consumer.KafkaConsumer; @@ -42,6 +45,7 @@ import java.io.IOException; import java.text.ParseException; import java.text.SimpleDateFormat; +import java.util.Arrays; import java.util.ArrayList; import java.util.Date; import java.util.HashMap; @@ -51,6 +55,7 @@ import java.util.Map; import java.util.Properties; import java.util.Set; +import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; /** @@ -120,8 +125,8 @@ public int run(final String[] args, } properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, options.valueOf(bootstrapServerOption)); - validateNoActiveConsumers(groupId, properties); kafkaAdminClient = (KafkaAdminClient) AdminClient.create(properties); + validateNoActiveConsumers(groupId, kafkaAdminClient); allTopics.clear(); allTopics.addAll(kafkaAdminClient.listTopics().names().get(60, TimeUnit.SECONDS)); @@ -149,18 +154,14 @@ public int run(final String[] args, } private void validateNoActiveConsumers(final String groupId, - final Properties properties) { - kafka.admin.AdminClient olderAdminClient = null; - try { - olderAdminClient = kafka.admin.AdminClient.create(properties); - if (!olderAdminClient.describeConsumerGroup(groupId, 0).consumers().get().isEmpty()) { - throw new IllegalStateException("Consumer group '" + groupId + "' is still active. " - + "Make sure to stop all running application instances before running the reset tool."); - } - } finally { - if (olderAdminClient != null) { - olderAdminClient.close(); - } + final AdminClient adminClient) throws ExecutionException, InterruptedException { + final DescribeConsumerGroupsResult describeResult = adminClient.describeConsumerGroups(Arrays.asList(groupId), + (new DescribeConsumerGroupsOptions()).timeoutMs(10 * 1000)); + final List<MemberDescription> members = describeResult.describedGroups().get(groupId).get().members(); + if (!members.isEmpty()) { + throw new IllegalStateException("Consumer group '" + groupId + "' is still active " + + "and has following members: " + members + ". " + + "Make sure to stop all running application instances before running the reset tool."); } } ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Remove Deprecated AdminClient from Streams Resetter Tool > -------------------------------------------------------- > > Key: KAFKA-5965 > URL: https://issues.apache.org/jira/browse/KAFKA-5965 > Project: Kafka > Issue Type: Improvement > Components: streams > Affects Versions: 1.0.0 > Reporter: Bill Bejeck > Assignee: Alexander Fedosov > Priority: Major > Labels: newbie > > To break the dependency on using ZK, the {{StreamsResetter}} tool now uses > the {{KafkaAdminClient}} for deleting topics and the > {{kafka.admin.AdminClient}} for verfiying no consumer groups are active > before running. > Once the {{KafkaAdminClient}} has a describe group functionality, we should > remove the dependency on {{kafka.admin.AdminClient}} from the > {{StreamsResetter}}. -- This message was sent by Atlassian JIRA (v7.6.3#76005)