Repository: kafka
Updated Branches:
  refs/heads/trunk 1fd70c7c9 -> 271f6b5ae


KAFKA-5862: Remove ZK dependency from Streams reset tool, Part I

Author: Bill Bejeck <b...@confluent.io>
Author: bbejeck <bbej...@gmail.com>

Reviewers: Matthias J. Sax <matth...@confluent.io>, Ted Yu 
<yuzhih...@gmail.com>, Guozhang Wang <wangg...@gmail.com>

Closes #3927 from 
bbejeck/KAFKA-5862_remove_zk_dependency_from_streams_reset_tool


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/271f6b5a
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/271f6b5a
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/271f6b5a

Branch: refs/heads/trunk
Commit: 271f6b5aec885d2eb348dea4de637ac269d3e1ca
Parents: 1fd70c7
Author: Bill Bejeck <b...@confluent.io>
Authored: Sat Sep 23 12:05:16 2017 +0800
Committer: Guozhang Wang <wangg...@gmail.com>
Committed: Sat Sep 23 12:05:16 2017 +0800

----------------------------------------------------------------------
 checkstyle/import-control-core.xml              |   1 +
 .../main/scala/kafka/tools/StreamsResetter.java | 123 +++++++++++--------
 .../integration/ResetIntegrationTest.java       |  59 ++++-----
 3 files changed, 103 insertions(+), 80 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/271f6b5a/checkstyle/import-control-core.xml
----------------------------------------------------------------------
diff --git a/checkstyle/import-control-core.xml 
b/checkstyle/import-control-core.xml
index 856df58..bf06a19 100644
--- a/checkstyle/import-control-core.xml
+++ b/checkstyle/import-control-core.xml
@@ -53,6 +53,7 @@
   </subpackage>
 
   <subpackage name="tools">
+    <allow pkg="org.apache.kafka.clients.admin" />
     <allow pkg="kafka.admin" />
     <allow pkg="kafka.javaapi" />
     <allow pkg="kafka.producer" />

http://git-wip-us.apache.org/repos/asf/kafka/blob/271f6b5a/core/src/main/scala/kafka/tools/StreamsResetter.java
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/tools/StreamsResetter.java 
b/core/src/main/scala/kafka/tools/StreamsResetter.java
index 9cf0e5c..09d0d75 100644
--- a/core/src/main/scala/kafka/tools/StreamsResetter.java
+++ b/core/src/main/scala/kafka/tools/StreamsResetter.java
@@ -17,19 +17,14 @@
 package kafka.tools;
 
 
-import joptsimple.OptionException;
-import joptsimple.OptionParser;
-import joptsimple.OptionSet;
-import joptsimple.OptionSpec;
-import joptsimple.OptionSpecBuilder;
-import kafka.admin.AdminClient;
-import kafka.admin.TopicCommand;
-import kafka.utils.ZkUtils;
+import org.apache.kafka.clients.admin.AdminClient;
+import org.apache.kafka.clients.admin.DeleteTopicsResult;
+import org.apache.kafka.clients.admin.KafkaAdminClient;
 import org.apache.kafka.clients.consumer.ConsumerConfig;
 import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.common.KafkaFuture;
 import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.annotation.InterfaceStability;
-import org.apache.kafka.common.security.JaasUtils;
 import org.apache.kafka.common.serialization.ByteArrayDeserializer;
 import org.apache.kafka.common.utils.Exit;
 
@@ -38,8 +33,16 @@ import java.util.ArrayList;
 import java.util.HashSet;
 import java.util.LinkedList;
 import java.util.List;
+import java.util.Map;
 import java.util.Properties;
 import java.util.Set;
+import java.util.concurrent.TimeUnit;
+
+import joptsimple.OptionException;
+import joptsimple.OptionParser;
+import joptsimple.OptionSet;
+import joptsimple.OptionSpec;
+import joptsimple.OptionSpecBuilder;
 
 /**
  * {@link StreamsResetter} resets the processing state of a Kafka Streams 
application so that, for example, you can reprocess its input from scratch.
@@ -68,7 +71,7 @@ public class StreamsResetter {
     private static final int EXIT_CODE_ERROR = 1;
 
     private static OptionSpec<String> bootstrapServerOption;
-    private static OptionSpec<String> zookeeperOption;
+    private static OptionSpecBuilder zookeeperOption;
     private static OptionSpec<String> applicationIdOption;
     private static OptionSpec<String> inputTopicsOption;
     private static OptionSpec<String> intermediateTopicsOption;
@@ -89,52 +92,57 @@ public class StreamsResetter {
 
         int exitCode = EXIT_CODE_SUCCESS;
 
-        AdminClient adminClient = null;
-        ZkUtils zkUtils = null;
+        KafkaAdminClient kafkaAdminClient = null;
+
         try {
             parseArguments(args);
             dryRun = options.has(dryRunOption);
 
-            adminClient = 
AdminClient.createSimplePlaintext(options.valueOf(bootstrapServerOption));
             final String groupId = options.valueOf(applicationIdOption);
 
+            validateNoActiveConsumers(groupId);
 
-            zkUtils = ZkUtils.apply(options.valueOf(zookeeperOption),
-                30000,
-                30000,
-                JaasUtils.isZkSecurityEnabled());
+            final Properties adminClientProperties = new Properties();
+            adminClientProperties.put("bootstrap.servers", 
options.valueOf(bootstrapServerOption));
+            kafkaAdminClient = (KafkaAdminClient) 
AdminClient.create(adminClientProperties);
 
             allTopics.clear();
-            
allTopics.addAll(scala.collection.JavaConversions.seqAsJavaList(zkUtils.getAllTopics()));
-
-
-            if (!adminClient.describeConsumerGroup(groupId, 
0).consumers().get().isEmpty()) {
-                throw new IllegalStateException("Consumer group '" + groupId + 
"' is still active. " +
-                            "Make sure to stop all running application 
instances before running the reset tool.");
-            }
+            allTopics.addAll(kafkaAdminClient.listTopics().names().get(60, 
TimeUnit.SECONDS));
 
             if (dryRun) {
                 System.out.println("----Dry run displays the actions which 
will be performed when running Streams Reset Tool----");
             }
             maybeResetInputAndSeekToEndIntermediateTopicOffsets();
-            maybeDeleteInternalTopics(zkUtils);
+            maybeDeleteInternalTopics(kafkaAdminClient);
 
         } catch (final Throwable e) {
             exitCode = EXIT_CODE_ERROR;
             System.err.println("ERROR: " + e);
             e.printStackTrace(System.err);
         } finally {
-            if (adminClient != null) {
-                adminClient.close();
-            }
-            if (zkUtils != null) {
-                zkUtils.close();
+            if (kafkaAdminClient != null) {
+                kafkaAdminClient.close(60, TimeUnit.SECONDS);
             }
         }
 
         return exitCode;
     }
 
+    private void validateNoActiveConsumers(final String groupId) {
+        kafka.admin.AdminClient olderAdminClient = null;
+        try {
+            olderAdminClient = 
kafka.admin.AdminClient.createSimplePlaintext(options.valueOf(bootstrapServerOption));
+            if (!olderAdminClient.describeConsumerGroup(groupId, 
0).consumers().get().isEmpty()) {
+                throw new IllegalStateException("Consumer group '" + groupId + 
"' is still active. "
+                                                + "Make sure to stop all 
running application instances before running the reset tool.");
+            }
+        } finally {
+            if (olderAdminClient != null) {
+                olderAdminClient.close();
+            }
+        }
+    }
+
     private void parseArguments(final String[] args) throws IOException {
 
         final OptionParser optionParser = new OptionParser(false);
@@ -148,11 +156,8 @@ public class StreamsResetter {
             .ofType(String.class)
             .defaultsTo("localhost:9092")
             .describedAs("urls");
-        zookeeperOption = optionParser.accepts("zookeeper", "Zookeeper url 
with format: HOST:POST")
-            .withRequiredArg()
-            .ofType(String.class)
-            .defaultsTo("localhost:2181")
-            .describedAs("url");
+        zookeeperOption = optionParser.accepts("zookeeper", "Zookeeper option 
is deprecated by bootstrap.servers, as the reset tool would no longer access 
Zookeeper directly.");
+
         inputTopicsOption = optionParser.accepts("input-topics", 
"Comma-separated list of user input topics. For these topics, the tool will 
reset the offset to the earliest available offset.")
             .withRequiredArg()
             .ofType(String.class)
@@ -314,30 +319,46 @@ public class StreamsResetter {
         return options.valuesOf(intermediateTopicsOption).contains(topic);
     }
 
-    private void maybeDeleteInternalTopics(final ZkUtils zkUtils) {
+    private void maybeDeleteInternalTopics(final KafkaAdminClient adminClient) 
{
 
         System.out.println("Deleting all internal/auto-created topics for 
application " + options.valueOf(applicationIdOption));
-
-        for (final String topic : allTopics) {
-            if (isInternalTopic(topic)) {
-                try {
-                    if (!dryRun) {
-                        final TopicCommand.TopicCommandOptions commandOptions 
= new TopicCommand.TopicCommandOptions(new String[]{
-                            "--zookeeper", options.valueOf(zookeeperOption),
-                            "--delete", "--topic", topic});
-                        TopicCommand.deleteTopic(zkUtils, commandOptions);
-                    } else {
-                        System.out.println("Topic: " + topic);
-                    }
-                } catch (final RuntimeException e) {
-                    System.err.println("ERROR: Deleting topic " + topic + " 
failed.");
-                    throw e;
+        List<String> topicsToDelete = new ArrayList<>();
+        for (final String listing : allTopics) {
+            if (isInternalTopic(listing)) {
+                if (!dryRun) {
+                    topicsToDelete.add(listing);
+                } else {
+                    System.out.println("Topic: " + listing);
                 }
             }
         }
+        if (!dryRun) {
+            doDelete(topicsToDelete, adminClient);
+        }
         System.out.println("Done.");
     }
 
+    private void doDelete(final List<String> topicsToDelete,
+                          final KafkaAdminClient adminClient) {
+        boolean hasDeleteErrors = false;
+        final DeleteTopicsResult deleteTopicsResult = 
adminClient.deleteTopics(topicsToDelete);
+        final Map<String, KafkaFuture<Void>> results = 
deleteTopicsResult.values();
+
+        for (final Map.Entry<String, KafkaFuture<Void>> entry : 
results.entrySet()) {
+            try {
+                entry.getValue().get(30, TimeUnit.SECONDS);
+            } catch (Exception e) {
+                System.err.println("ERROR: deleting topic " + entry.getKey());
+                e.printStackTrace(System.err);
+                hasDeleteErrors = true;
+            }
+        }
+        if (hasDeleteErrors) {
+            throw new RuntimeException("Encountered an error deleting one or 
more topics");
+        }
+    }
+
+
     private boolean isInternalTopic(final String topicName) {
         return topicName.startsWith(options.valueOf(applicationIdOption) + "-")
             && (topicName.endsWith("-changelog") || 
topicName.endsWith("-repartition"));

http://git-wip-us.apache.org/repos/asf/kafka/blob/271f6b5a/streams/src/test/java/org/apache/kafka/streams/integration/ResetIntegrationTest.java
----------------------------------------------------------------------
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/integration/ResetIntegrationTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/integration/ResetIntegrationTest.java
index 897028d..d76f5da 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/integration/ResetIntegrationTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/integration/ResetIntegrationTest.java
@@ -16,19 +16,14 @@
  */
 package org.apache.kafka.streams.integration;
 
-import kafka.admin.AdminClient;
-import kafka.server.KafkaConfig$;
-import kafka.tools.StreamsResetter;
-import kafka.utils.MockTime;
-import kafka.utils.ZkUtils;
+import org.apache.kafka.clients.admin.KafkaAdminClient;
+import org.apache.kafka.clients.admin.ListTopicsOptions;
 import org.apache.kafka.clients.consumer.ConsumerConfig;
 import org.apache.kafka.common.errors.TimeoutException;
-import org.apache.kafka.common.security.JaasUtils;
 import org.apache.kafka.common.serialization.LongDeserializer;
 import org.apache.kafka.common.serialization.LongSerializer;
 import org.apache.kafka.common.serialization.Serdes;
 import org.apache.kafka.common.serialization.StringSerializer;
-import org.apache.kafka.common.utils.Utils;
 import org.apache.kafka.streams.KafkaStreams;
 import org.apache.kafka.streams.KeyValue;
 import org.apache.kafka.streams.StreamsBuilder;
@@ -56,6 +51,12 @@ import java.util.HashSet;
 import java.util.List;
 import java.util.Properties;
 import java.util.Set;
+import java.util.concurrent.TimeUnit;
+
+import kafka.admin.AdminClient;
+import kafka.server.KafkaConfig$;
+import kafka.tools.StreamsResetter;
+import kafka.utils.MockTime;
 
 import static org.hamcrest.CoreMatchers.equalTo;
 import static org.hamcrest.MatcherAssert.assertThat;
@@ -94,6 +95,7 @@ public class ResetIntegrationTest {
 
     private static int testNo = 0;
     private static AdminClient adminClient = null;
+    private static KafkaAdminClient kafkaAdminClient = null;
 
     private final MockTime mockTime = CLUSTER.time;
     private final WaitUntilConsumerGroupGotClosed consumerGroupInactive = new 
WaitUntilConsumerGroupGotClosed();
@@ -104,6 +106,11 @@ public class ResetIntegrationTest {
             adminClient.close();
             adminClient = null;
         }
+
+        if (kafkaAdminClient != null) {
+            kafkaAdminClient.close(10, TimeUnit.SECONDS);
+            kafkaAdminClient = null;
+        }
     }
 
     @Before
@@ -114,6 +121,12 @@ public class ResetIntegrationTest {
             adminClient = 
AdminClient.createSimplePlaintext(CLUSTER.bootstrapServers());
         }
 
+        if (kafkaAdminClient == null) {
+            Properties props = new Properties();
+            props.put("bootstrap.servers", CLUSTER.bootstrapServers());
+            kafkaAdminClient =  (KafkaAdminClient) 
org.apache.kafka.clients.admin.AdminClient.create(props);
+        }
+
         // busy wait until cluster (ie, ConsumerGroupCoordinator) is available
         while (true) {
             Thread.sleep(50);
@@ -338,20 +351,20 @@ public class ResetIntegrationTest {
     }
 
     private void cleanGlobal(final String intermediateUserTopic) {
+        // leaving --zookeeper arg here to ensure tool works if users add it
         final String[] parameters;
         if (intermediateUserTopic != null) {
             parameters = new String[]{
                 "--application-id", APP_ID + testNo,
                 "--bootstrap-servers", CLUSTER.bootstrapServers(),
-                "--zookeeper", CLUSTER.zKConnectString(),
                 "--input-topics", INPUT_TOPIC,
-                "--intermediate-topics", INTERMEDIATE_USER_TOPIC
+                "--intermediate-topics", INTERMEDIATE_USER_TOPIC,
+                "--zookeeper", "localhost:2181"
             };
         } else {
             parameters = new String[]{
                 "--application-id", APP_ID + testNo,
                 "--bootstrap-servers", CLUSTER.bootstrapServers(),
-                "--zookeeper", CLUSTER.zKConnectString(),
                 "--input-topics", INPUT_TOPIC
             };
         }
@@ -363,7 +376,7 @@ public class ResetIntegrationTest {
         Assert.assertEquals(0, exitCode);
     }
 
-    private void assertInternalTopicsGotDeleted(final String 
intermediateUserTopic) {
+    private void assertInternalTopicsGotDeleted(final String 
intermediateUserTopic) throws Exception {
         final Set<String> expectedRemainingTopicsAfterCleanup = new 
HashSet<>();
         expectedRemainingTopicsAfterCleanup.add(INPUT_TOPIC);
         if (intermediateUserTopic != null) {
@@ -374,25 +387,13 @@ public class ResetIntegrationTest {
         expectedRemainingTopicsAfterCleanup.add(OUTPUT_TOPIC_2_RERUN);
         expectedRemainingTopicsAfterCleanup.add("__consumer_offsets");
 
-        Set<String> allTopics;
-        ZkUtils zkUtils = null;
-        try {
-            zkUtils = ZkUtils.apply(CLUSTER.zKConnectString(),
-                30000,
-                30000,
-                JaasUtils.isZkSecurityEnabled());
-
-            do {
-                Utils.sleep(100);
-                allTopics = new HashSet<>();
-                
allTopics.addAll(scala.collection.JavaConversions.seqAsJavaList(zkUtils.getAllTopics()));
-            } while (allTopics.size() != 
expectedRemainingTopicsAfterCleanup.size());
-        } finally {
-            if (zkUtils != null) {
-                zkUtils.close();
-            }
-        }
+        final Set<String> allTopics = new HashSet<>();
+
+        final ListTopicsOptions listTopicsOptions = new ListTopicsOptions();
+        listTopicsOptions.listInternal(true);
+        
allTopics.addAll(kafkaAdminClient.listTopics(listTopicsOptions).names().get(30000,
 TimeUnit.MILLISECONDS));
         assertThat(allTopics, equalTo(expectedRemainingTopicsAfterCleanup));
+
     }
 
     private class WaitUntilConsumerGroupGotClosed implements TestCondition {

Reply via email to