Repository: kafka Updated Branches: refs/heads/trunk 1fd70c7c9 -> 271f6b5ae
KAFKA-5862: Remove ZK dependency from Streams reset tool, Part I Author: Bill Bejeck <b...@confluent.io> Author: bbejeck <bbej...@gmail.com> Reviewers: Matthias J. Sax <matth...@confluent.io>, Ted Yu <yuzhih...@gmail.com>, Guozhang Wang <wangg...@gmail.com> Closes #3927 from bbejeck/KAFKA-5862_remove_zk_dependency_from_streams_reset_tool Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/271f6b5a Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/271f6b5a Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/271f6b5a Branch: refs/heads/trunk Commit: 271f6b5aec885d2eb348dea4de637ac269d3e1ca Parents: 1fd70c7 Author: Bill Bejeck <b...@confluent.io> Authored: Sat Sep 23 12:05:16 2017 +0800 Committer: Guozhang Wang <wangg...@gmail.com> Committed: Sat Sep 23 12:05:16 2017 +0800 ---------------------------------------------------------------------- checkstyle/import-control-core.xml | 1 + .../main/scala/kafka/tools/StreamsResetter.java | 123 +++++++++++-------- .../integration/ResetIntegrationTest.java | 59 ++++----- 3 files changed, 103 insertions(+), 80 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kafka/blob/271f6b5a/checkstyle/import-control-core.xml ---------------------------------------------------------------------- diff --git a/checkstyle/import-control-core.xml b/checkstyle/import-control-core.xml index 856df58..bf06a19 100644 --- a/checkstyle/import-control-core.xml +++ b/checkstyle/import-control-core.xml @@ -53,6 +53,7 @@ </subpackage> <subpackage name="tools"> + <allow pkg="org.apache.kafka.clients.admin" /> <allow pkg="kafka.admin" /> <allow pkg="kafka.javaapi" /> <allow pkg="kafka.producer" /> http://git-wip-us.apache.org/repos/asf/kafka/blob/271f6b5a/core/src/main/scala/kafka/tools/StreamsResetter.java ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/tools/StreamsResetter.java b/core/src/main/scala/kafka/tools/StreamsResetter.java index 9cf0e5c..09d0d75 100644 --- a/core/src/main/scala/kafka/tools/StreamsResetter.java +++ b/core/src/main/scala/kafka/tools/StreamsResetter.java @@ -17,19 +17,14 @@ package kafka.tools; -import joptsimple.OptionException; -import joptsimple.OptionParser; -import joptsimple.OptionSet; -import joptsimple.OptionSpec; -import joptsimple.OptionSpecBuilder; -import kafka.admin.AdminClient; -import kafka.admin.TopicCommand; -import kafka.utils.ZkUtils; +import org.apache.kafka.clients.admin.AdminClient; +import org.apache.kafka.clients.admin.DeleteTopicsResult; +import org.apache.kafka.clients.admin.KafkaAdminClient; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.consumer.KafkaConsumer; +import org.apache.kafka.common.KafkaFuture; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.annotation.InterfaceStability; -import org.apache.kafka.common.security.JaasUtils; import org.apache.kafka.common.serialization.ByteArrayDeserializer; import org.apache.kafka.common.utils.Exit; @@ -38,8 +33,16 @@ import java.util.ArrayList; import java.util.HashSet; import java.util.LinkedList; import java.util.List; +import java.util.Map; import java.util.Properties; import java.util.Set; +import java.util.concurrent.TimeUnit; + +import joptsimple.OptionException; +import joptsimple.OptionParser; +import joptsimple.OptionSet; +import joptsimple.OptionSpec; +import joptsimple.OptionSpecBuilder; /** * {@link StreamsResetter} resets the processing state of a Kafka Streams application so that, for example, you can reprocess its input from scratch. @@ -68,7 +71,7 @@ public class StreamsResetter { private static final int EXIT_CODE_ERROR = 1; private static OptionSpec<String> bootstrapServerOption; - private static OptionSpec<String> zookeeperOption; + private static OptionSpecBuilder zookeeperOption; private static OptionSpec<String> applicationIdOption; private static OptionSpec<String> inputTopicsOption; private static OptionSpec<String> intermediateTopicsOption; @@ -89,52 +92,57 @@ public class StreamsResetter { int exitCode = EXIT_CODE_SUCCESS; - AdminClient adminClient = null; - ZkUtils zkUtils = null; + KafkaAdminClient kafkaAdminClient = null; + try { parseArguments(args); dryRun = options.has(dryRunOption); - adminClient = AdminClient.createSimplePlaintext(options.valueOf(bootstrapServerOption)); final String groupId = options.valueOf(applicationIdOption); + validateNoActiveConsumers(groupId); - zkUtils = ZkUtils.apply(options.valueOf(zookeeperOption), - 30000, - 30000, - JaasUtils.isZkSecurityEnabled()); + final Properties adminClientProperties = new Properties(); + adminClientProperties.put("bootstrap.servers", options.valueOf(bootstrapServerOption)); + kafkaAdminClient = (KafkaAdminClient) AdminClient.create(adminClientProperties); allTopics.clear(); - allTopics.addAll(scala.collection.JavaConversions.seqAsJavaList(zkUtils.getAllTopics())); - - - if (!adminClient.describeConsumerGroup(groupId, 0).consumers().get().isEmpty()) { - throw new IllegalStateException("Consumer group '" + groupId + "' is still active. " + - "Make sure to stop all running application instances before running the reset tool."); - } + allTopics.addAll(kafkaAdminClient.listTopics().names().get(60, TimeUnit.SECONDS)); if (dryRun) { System.out.println("----Dry run displays the actions which will be performed when running Streams Reset Tool----"); } maybeResetInputAndSeekToEndIntermediateTopicOffsets(); - maybeDeleteInternalTopics(zkUtils); + maybeDeleteInternalTopics(kafkaAdminClient); } catch (final Throwable e) { exitCode = EXIT_CODE_ERROR; System.err.println("ERROR: " + e); e.printStackTrace(System.err); } finally { - if (adminClient != null) { - adminClient.close(); - } - if (zkUtils != null) { - zkUtils.close(); + if (kafkaAdminClient != null) { + kafkaAdminClient.close(60, TimeUnit.SECONDS); } } return exitCode; } + private void validateNoActiveConsumers(final String groupId) { + kafka.admin.AdminClient olderAdminClient = null; + try { + olderAdminClient = kafka.admin.AdminClient.createSimplePlaintext(options.valueOf(bootstrapServerOption)); + if (!olderAdminClient.describeConsumerGroup(groupId, 0).consumers().get().isEmpty()) { + throw new IllegalStateException("Consumer group '" + groupId + "' is still active. " + + "Make sure to stop all running application instances before running the reset tool."); + } + } finally { + if (olderAdminClient != null) { + olderAdminClient.close(); + } + } + } + private void parseArguments(final String[] args) throws IOException { final OptionParser optionParser = new OptionParser(false); @@ -148,11 +156,8 @@ public class StreamsResetter { .ofType(String.class) .defaultsTo("localhost:9092") .describedAs("urls"); - zookeeperOption = optionParser.accepts("zookeeper", "Zookeeper url with format: HOST:POST") - .withRequiredArg() - .ofType(String.class) - .defaultsTo("localhost:2181") - .describedAs("url"); + zookeeperOption = optionParser.accepts("zookeeper", "Zookeeper option is deprecated by bootstrap.servers, as the reset tool would no longer access Zookeeper directly."); + inputTopicsOption = optionParser.accepts("input-topics", "Comma-separated list of user input topics. For these topics, the tool will reset the offset to the earliest available offset.") .withRequiredArg() .ofType(String.class) @@ -314,30 +319,46 @@ public class StreamsResetter { return options.valuesOf(intermediateTopicsOption).contains(topic); } - private void maybeDeleteInternalTopics(final ZkUtils zkUtils) { + private void maybeDeleteInternalTopics(final KafkaAdminClient adminClient) { System.out.println("Deleting all internal/auto-created topics for application " + options.valueOf(applicationIdOption)); - - for (final String topic : allTopics) { - if (isInternalTopic(topic)) { - try { - if (!dryRun) { - final TopicCommand.TopicCommandOptions commandOptions = new TopicCommand.TopicCommandOptions(new String[]{ - "--zookeeper", options.valueOf(zookeeperOption), - "--delete", "--topic", topic}); - TopicCommand.deleteTopic(zkUtils, commandOptions); - } else { - System.out.println("Topic: " + topic); - } - } catch (final RuntimeException e) { - System.err.println("ERROR: Deleting topic " + topic + " failed."); - throw e; + List<String> topicsToDelete = new ArrayList<>(); + for (final String listing : allTopics) { + if (isInternalTopic(listing)) { + if (!dryRun) { + topicsToDelete.add(listing); + } else { + System.out.println("Topic: " + listing); } } } + if (!dryRun) { + doDelete(topicsToDelete, adminClient); + } System.out.println("Done."); } + private void doDelete(final List<String> topicsToDelete, + final KafkaAdminClient adminClient) { + boolean hasDeleteErrors = false; + final DeleteTopicsResult deleteTopicsResult = adminClient.deleteTopics(topicsToDelete); + final Map<String, KafkaFuture<Void>> results = deleteTopicsResult.values(); + + for (final Map.Entry<String, KafkaFuture<Void>> entry : results.entrySet()) { + try { + entry.getValue().get(30, TimeUnit.SECONDS); + } catch (Exception e) { + System.err.println("ERROR: deleting topic " + entry.getKey()); + e.printStackTrace(System.err); + hasDeleteErrors = true; + } + } + if (hasDeleteErrors) { + throw new RuntimeException("Encountered an error deleting one or more topics"); + } + } + + private boolean isInternalTopic(final String topicName) { return topicName.startsWith(options.valueOf(applicationIdOption) + "-") && (topicName.endsWith("-changelog") || topicName.endsWith("-repartition")); http://git-wip-us.apache.org/repos/asf/kafka/blob/271f6b5a/streams/src/test/java/org/apache/kafka/streams/integration/ResetIntegrationTest.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/ResetIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/ResetIntegrationTest.java index 897028d..d76f5da 100644 --- a/streams/src/test/java/org/apache/kafka/streams/integration/ResetIntegrationTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/integration/ResetIntegrationTest.java @@ -16,19 +16,14 @@ */ package org.apache.kafka.streams.integration; -import kafka.admin.AdminClient; -import kafka.server.KafkaConfig$; -import kafka.tools.StreamsResetter; -import kafka.utils.MockTime; -import kafka.utils.ZkUtils; +import org.apache.kafka.clients.admin.KafkaAdminClient; +import org.apache.kafka.clients.admin.ListTopicsOptions; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.common.errors.TimeoutException; -import org.apache.kafka.common.security.JaasUtils; import org.apache.kafka.common.serialization.LongDeserializer; import org.apache.kafka.common.serialization.LongSerializer; import org.apache.kafka.common.serialization.Serdes; import org.apache.kafka.common.serialization.StringSerializer; -import org.apache.kafka.common.utils.Utils; import org.apache.kafka.streams.KafkaStreams; import org.apache.kafka.streams.KeyValue; import org.apache.kafka.streams.StreamsBuilder; @@ -56,6 +51,12 @@ import java.util.HashSet; import java.util.List; import java.util.Properties; import java.util.Set; +import java.util.concurrent.TimeUnit; + +import kafka.admin.AdminClient; +import kafka.server.KafkaConfig$; +import kafka.tools.StreamsResetter; +import kafka.utils.MockTime; import static org.hamcrest.CoreMatchers.equalTo; import static org.hamcrest.MatcherAssert.assertThat; @@ -94,6 +95,7 @@ public class ResetIntegrationTest { private static int testNo = 0; private static AdminClient adminClient = null; + private static KafkaAdminClient kafkaAdminClient = null; private final MockTime mockTime = CLUSTER.time; private final WaitUntilConsumerGroupGotClosed consumerGroupInactive = new WaitUntilConsumerGroupGotClosed(); @@ -104,6 +106,11 @@ public class ResetIntegrationTest { adminClient.close(); adminClient = null; } + + if (kafkaAdminClient != null) { + kafkaAdminClient.close(10, TimeUnit.SECONDS); + kafkaAdminClient = null; + } } @Before @@ -114,6 +121,12 @@ public class ResetIntegrationTest { adminClient = AdminClient.createSimplePlaintext(CLUSTER.bootstrapServers()); } + if (kafkaAdminClient == null) { + Properties props = new Properties(); + props.put("bootstrap.servers", CLUSTER.bootstrapServers()); + kafkaAdminClient = (KafkaAdminClient) org.apache.kafka.clients.admin.AdminClient.create(props); + } + // busy wait until cluster (ie, ConsumerGroupCoordinator) is available while (true) { Thread.sleep(50); @@ -338,20 +351,20 @@ public class ResetIntegrationTest { } private void cleanGlobal(final String intermediateUserTopic) { + // leaving --zookeeper arg here to ensure tool works if users add it final String[] parameters; if (intermediateUserTopic != null) { parameters = new String[]{ "--application-id", APP_ID + testNo, "--bootstrap-servers", CLUSTER.bootstrapServers(), - "--zookeeper", CLUSTER.zKConnectString(), "--input-topics", INPUT_TOPIC, - "--intermediate-topics", INTERMEDIATE_USER_TOPIC + "--intermediate-topics", INTERMEDIATE_USER_TOPIC, + "--zookeeper", "localhost:2181" }; } else { parameters = new String[]{ "--application-id", APP_ID + testNo, "--bootstrap-servers", CLUSTER.bootstrapServers(), - "--zookeeper", CLUSTER.zKConnectString(), "--input-topics", INPUT_TOPIC }; } @@ -363,7 +376,7 @@ public class ResetIntegrationTest { Assert.assertEquals(0, exitCode); } - private void assertInternalTopicsGotDeleted(final String intermediateUserTopic) { + private void assertInternalTopicsGotDeleted(final String intermediateUserTopic) throws Exception { final Set<String> expectedRemainingTopicsAfterCleanup = new HashSet<>(); expectedRemainingTopicsAfterCleanup.add(INPUT_TOPIC); if (intermediateUserTopic != null) { @@ -374,25 +387,13 @@ public class ResetIntegrationTest { expectedRemainingTopicsAfterCleanup.add(OUTPUT_TOPIC_2_RERUN); expectedRemainingTopicsAfterCleanup.add("__consumer_offsets"); - Set<String> allTopics; - ZkUtils zkUtils = null; - try { - zkUtils = ZkUtils.apply(CLUSTER.zKConnectString(), - 30000, - 30000, - JaasUtils.isZkSecurityEnabled()); - - do { - Utils.sleep(100); - allTopics = new HashSet<>(); - allTopics.addAll(scala.collection.JavaConversions.seqAsJavaList(zkUtils.getAllTopics())); - } while (allTopics.size() != expectedRemainingTopicsAfterCleanup.size()); - } finally { - if (zkUtils != null) { - zkUtils.close(); - } - } + final Set<String> allTopics = new HashSet<>(); + + final ListTopicsOptions listTopicsOptions = new ListTopicsOptions(); + listTopicsOptions.listInternal(true); + allTopics.addAll(kafkaAdminClient.listTopics(listTopicsOptions).names().get(30000, TimeUnit.MILLISECONDS)); assertThat(allTopics, equalTo(expectedRemainingTopicsAfterCleanup)); + } private class WaitUntilConsumerGroupGotClosed implements TestCondition {