[ 
https://issues.apache.org/jira/browse/KAFKA-5965?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16472135#comment-16472135
 ] 

ASF GitHub Bot commented on KAFKA-5965:
---------------------------------------

guozhangwang closed pull request #4968: KAFKA-5965: Remove Deprecated 
AdminClient from Streams Resetter Tool
URL: https://github.com/apache/kafka/pull/4968
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/core/src/main/scala/kafka/tools/StreamsResetter.java 
b/core/src/main/scala/kafka/tools/StreamsResetter.java
index b0d52764b59..d7c4e435e58 100644
--- a/core/src/main/scala/kafka/tools/StreamsResetter.java
+++ b/core/src/main/scala/kafka/tools/StreamsResetter.java
@@ -23,8 +23,11 @@
 import joptsimple.OptionSpecBuilder;
 import kafka.utils.CommandLineUtils;
 import org.apache.kafka.clients.admin.AdminClient;
+import org.apache.kafka.clients.admin.MemberDescription;
 import org.apache.kafka.clients.admin.DeleteTopicsResult;
 import org.apache.kafka.clients.admin.KafkaAdminClient;
+import org.apache.kafka.clients.admin.DescribeConsumerGroupsOptions;
+import org.apache.kafka.clients.admin.DescribeConsumerGroupsResult;
 import org.apache.kafka.clients.consumer.Consumer;
 import org.apache.kafka.clients.consumer.ConsumerConfig;
 import org.apache.kafka.clients.consumer.KafkaConsumer;
@@ -42,6 +45,7 @@
 import java.io.IOException;
 import java.text.ParseException;
 import java.text.SimpleDateFormat;
+import java.util.Arrays;
 import java.util.ArrayList;
 import java.util.Date;
 import java.util.HashMap;
@@ -51,6 +55,7 @@
 import java.util.Map;
 import java.util.Properties;
 import java.util.Set;
+import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeUnit;
 
 /**
@@ -120,8 +125,8 @@ public int run(final String[] args,
             }
             properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, 
options.valueOf(bootstrapServerOption));
 
-            validateNoActiveConsumers(groupId, properties);
             kafkaAdminClient = (KafkaAdminClient) 
AdminClient.create(properties);
+            validateNoActiveConsumers(groupId, kafkaAdminClient);
 
             allTopics.clear();
             allTopics.addAll(kafkaAdminClient.listTopics().names().get(60, 
TimeUnit.SECONDS));
@@ -149,18 +154,14 @@ public int run(final String[] args,
     }
 
     private void validateNoActiveConsumers(final String groupId,
-                                           final Properties properties) {
-        kafka.admin.AdminClient olderAdminClient = null;
-        try {
-            olderAdminClient = kafka.admin.AdminClient.create(properties);
-            if (!olderAdminClient.describeConsumerGroup(groupId, 
0).consumers().get().isEmpty()) {
-                throw new IllegalStateException("Consumer group '" + groupId + 
"' is still active. "
-                                                + "Make sure to stop all 
running application instances before running the reset tool.");
-            }
-        } finally {
-            if (olderAdminClient != null) {
-                olderAdminClient.close();
-            }
+                                           final AdminClient adminClient) 
throws ExecutionException, InterruptedException {
+        final DescribeConsumerGroupsResult describeResult = 
adminClient.describeConsumerGroups(Arrays.asList(groupId),
+                (new DescribeConsumerGroupsOptions()).timeoutMs(10 * 1000));
+        final List<MemberDescription> members = 
describeResult.describedGroups().get(groupId).get().members();
+        if (!members.isEmpty()) {
+            throw new IllegalStateException("Consumer group '" + groupId + "' 
is still active "
+                    + "and has following members: " + members + ". "
+                    + "Make sure to stop all running application instances 
before running the reset tool.");
         }
     }
 


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Remove Deprecated AdminClient from Streams Resetter Tool
> --------------------------------------------------------
>
>                 Key: KAFKA-5965
>                 URL: https://issues.apache.org/jira/browse/KAFKA-5965
>             Project: Kafka
>          Issue Type: Improvement
>          Components: streams
>    Affects Versions: 1.0.0
>            Reporter: Bill Bejeck
>            Assignee: Alexander Fedosov
>            Priority: Major
>              Labels: newbie
>
> To break the dependency on using ZK, the {{StreamsResetter}} tool now uses 
> the {{KafkaAdminClient}} for deleting topics and the 
> {{kafka.admin.AdminClient}} for verfiying no consumer groups are active 
> before running.
> Once the {{KafkaAdminClient}} has a describe group functionality, we should 
> remove the dependency on {{kafka.admin.AdminClient}} from the 
> {{StreamsResetter}}.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to