Repository: kafka Updated Branches: refs/heads/trunk 2a9f7af6c -> f7976d2fc
KAFKA-4008: Module "tools" should not be dependent on "core" moved streams application reset tool from tools to core Author: Matthias J. Sax <[email protected]> Reviewers: Ismael Juma <[email protected]>, Damian Guy <[email protected]>, Guozhang Wang <[email protected]>, Ewen Cheslack-Postava <[email protected]> Closes #1685 from mjsax/moveResetTool (cherry picked from commit f2405a73ea2dd4b636832b7f8729fb06a04de1d5) Signed-off-by: Ewen Cheslack-Postava <[email protected]> Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/f7976d2f Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/f7976d2f Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/f7976d2f Branch: refs/heads/trunk Commit: f7976d2fc1793d0f635b42eb4dca3810e40c4cc8 Parents: 2a9f7af Author: Matthias J. Sax <[email protected]> Authored: Mon Aug 1 20:12:22 2016 -0700 Committer: Ewen Cheslack-Postava <[email protected]> Committed: Mon Aug 1 20:12:38 2016 -0700 ---------------------------------------------------------------------- bin/kafka-streams-application-reset.sh | 2 +- build.gradle | 2 - checkstyle/import-control-core.xml | 2 + checkstyle/import-control.xml | 4 +- .../main/scala/kafka/tools/StreamsResetter.java | 268 +++++++++++++++++++ .../integration/ResetIntegrationTest.java | 2 +- .../org/apache/kafka/tools/StreamsResetter.java | 260 ------------------ 7 files changed, 273 insertions(+), 267 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kafka/blob/f7976d2f/bin/kafka-streams-application-reset.sh ---------------------------------------------------------------------- diff --git a/bin/kafka-streams-application-reset.sh b/bin/kafka-streams-application-reset.sh index 26ab766..3363732 100755 --- a/bin/kafka-streams-application-reset.sh +++ b/bin/kafka-streams-application-reset.sh @@ -18,4 +18,4 @@ if [ "x$KAFKA_HEAP_OPTS" = "x" ]; then export KAFKA_HEAP_OPTS="-Xmx512M" fi -exec $(dirname $0)/kafka-run-class.sh org.apache.kafka.tools.StreamsResetter "$@" +exec $(dirname $0)/kafka-run-class.sh kafka.tools.StreamsResetter "$@" http://git-wip-us.apache.org/repos/asf/kafka/blob/f7976d2f/build.gradle ---------------------------------------------------------------------- diff --git a/build.gradle b/build.gradle index 67ddfdb..ead2520 100644 --- a/build.gradle +++ b/build.gradle @@ -641,7 +641,6 @@ project(':tools') { archivesBaseName = "kafka-tools" dependencies { - compile project(':core') compile project(':clients') compile project(':log4j-appender') compile libs.argparse4j @@ -690,7 +689,6 @@ project(':streams') { testCompile project(':clients').sourceSets.test.output testCompile project(':core') testCompile project(':core').sourceSets.test.output - testCompile project(':tools') testCompile libs.junit testRuntime libs.slf4jlog4j http://git-wip-us.apache.org/repos/asf/kafka/blob/f7976d2f/checkstyle/import-control-core.xml ---------------------------------------------------------------------- diff --git a/checkstyle/import-control-core.xml b/checkstyle/import-control-core.xml index d53e9e8..5714bfd 100644 --- a/checkstyle/import-control-core.xml +++ b/checkstyle/import-control-core.xml @@ -53,10 +53,12 @@ </subpackage> <subpackage name="tools"> + <allow pkg="kafka.admin" /> <allow pkg="kafka.javaapi" /> <allow pkg="kafka.producer" /> <allow pkg="kafka.consumer" /> <allow pkg="joptsimple" /> + <allow pkg="org.apache.kafka.clients.consumer" /> </subpackage> <subpackage name="examples"> http://git-wip-us.apache.org/repos/asf/kafka/blob/f7976d2f/checkstyle/import-control.xml ---------------------------------------------------------------------- diff --git a/checkstyle/import-control.xml b/checkstyle/import-control.xml index 1052d8e..632b516 100644 --- a/checkstyle/import-control.xml +++ b/checkstyle/import-control.xml @@ -123,8 +123,6 @@ <allow pkg="com.fasterxml.jackson" /> <allow pkg="net.sourceforge.argparse4j" /> <allow pkg="org.apache.log4j" /> - <allow pkg="joptsimple" /> - <allow pkg="kafka" /> </subpackage> <subpackage name="streams"> @@ -144,6 +142,7 @@ <subpackage name="integration"> <allow pkg="kafka.admin" /> <allow pkg="kafka.server" /> + <allow pkg="kafka.tools" /> <allow pkg="kafka.utils" /> <allow pkg="kafka.zk" /> <allow pkg="kafka.log" /> @@ -151,7 +150,6 @@ <allow pkg="scala.collection" /> <allow pkg="org.I0Itec.zkclient" /> <allow pkg="org.hamcrest" /> - <allow pkg="org.apache.kafka.tools" /> </subpackage> <subpackage name="state"> http://git-wip-us.apache.org/repos/asf/kafka/blob/f7976d2f/core/src/main/scala/kafka/tools/StreamsResetter.java ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/tools/StreamsResetter.java b/core/src/main/scala/kafka/tools/StreamsResetter.java new file mode 100644 index 0000000..8e463d1 --- /dev/null +++ b/core/src/main/scala/kafka/tools/StreamsResetter.java @@ -0,0 +1,268 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package kafka.tools; + +import joptsimple.OptionException; +import joptsimple.OptionParser; +import joptsimple.OptionSet; +import joptsimple.OptionSpec; +import kafka.admin.TopicCommand; +import kafka.utils.ZkUtils; +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.clients.consumer.KafkaConsumer; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.annotation.InterfaceStability; +import org.apache.kafka.common.security.JaasUtils; +import org.apache.kafka.common.serialization.ByteArrayDeserializer; + +import java.io.IOException; +import java.util.Collections; +import java.util.LinkedList; +import java.util.List; +import java.util.Properties; +import java.util.Set; + +/** + * {@link StreamsResetter} resets the processing state of a Kafka Streams application so that, for example, you can reprocess its input from scratch. + * <p> + * <strong>This class is not part of public API. For backward compatibility, use the provided script in "bin/" instead of calling this class directly from your code.</strong> + * <p> + * Resetting the processing state of an application includes the following actions: + * <ol> + * <li>setting the application's consumer offsets for input and internal topics to zero</li> + * <li>skip over all intermediate user topics (i.e., "seekToEnd" for consumers of intermediate topics)</li> + * <li>deleting any topics created internally by Kafka Streams for this application</li> + * </ol> + * <p> + * Do only use this tool if <strong>no</strong> application instance is running. Otherwise, the application will get into an invalid state and crash or produce wrong results. + * <p> + * If you run multiple application instances, running this tool once is sufficient. + * However, you need to call {@code KafkaStreams#cleanUp()} before re-starting any instance (to clean local state store directory). + * Otherwise, your application is in an invalid state. + * <p> + * User output topics will not be deleted or modified by this tool. + * If downstream applications consume intermediate or output topics, it is the user's responsibility to adjust those applications manually if required. + */ [email protected] +public class StreamsResetter { + private static final int EXIT_CODE_SUCCESS = 0; + private static final int EXIT_CODE_ERROR = 1; + + private static OptionSpec<String> bootstrapServerOption; + private static OptionSpec<String> zookeeperOption; + private static OptionSpec<String> applicationIdOption; + private static OptionSpec<String> inputTopicsOption; + private static OptionSpec<String> intermediateTopicsOption; + + private OptionSet options = null; + private final Properties consumerConfig = new Properties(); + private final List<String> allTopics = new LinkedList<>(); + + public int run(final String[] args) { + return run(args, new Properties()); + } + + public int run(final String[] args, final Properties config) { + this.consumerConfig.clear(); + this.consumerConfig.putAll(config); + + int exitCode = EXIT_CODE_SUCCESS; + + ZkUtils zkUtils = null; + try { + parseArguments(args); + + zkUtils = ZkUtils.apply(this.options.valueOf(zookeeperOption), + 30000, + 30000, + JaasUtils.isZkSecurityEnabled()); + + this.allTopics.clear(); + this.allTopics.addAll(scala.collection.JavaConversions.seqAsJavaList(zkUtils.getAllTopics())); + + resetInputAndInternalTopicOffsets(); + seekToEndIntermediateTopics(); + deleteInternalTopics(zkUtils); + } catch (final Exception e) { + exitCode = EXIT_CODE_ERROR; + System.err.println("ERROR: " + e.getMessage()); + } finally { + if (zkUtils != null) { + zkUtils.close(); + } + } + + return exitCode; + } + + private void parseArguments(final String[] args) throws IOException { + final OptionParser optionParser = new OptionParser(); + applicationIdOption = optionParser.accepts("application-id", "The Kafka Streams application ID (application.id)") + .withRequiredArg() + .ofType(String.class) + .describedAs("id") + .required(); + bootstrapServerOption = optionParser.accepts("bootstrap-servers", "Comma-separated list of broker urls with format: HOST1:PORT1,HOST2:PORT2") + .withRequiredArg() + .ofType(String.class) + .defaultsTo("localhost:9092") + .describedAs("urls"); + zookeeperOption = optionParser.accepts("zookeeper", "Format: HOST:POST") + .withRequiredArg() + .ofType(String.class) + .defaultsTo("localhost:2181") + .describedAs("url"); + inputTopicsOption = optionParser.accepts("input-topics", "Comma-separated list of user input topics") + .withRequiredArg() + .ofType(String.class) + .withValuesSeparatedBy(',') + .describedAs("list"); + intermediateTopicsOption = optionParser.accepts("intermediate-topics", "Comma-separated list of intermediate user topics") + .withRequiredArg() + .ofType(String.class) + .withValuesSeparatedBy(',') + .describedAs("list"); + + try { + this.options = optionParser.parse(args); + } catch (final OptionException e) { + optionParser.printHelpOn(System.err); + throw e; + } + } + + private void resetInputAndInternalTopicOffsets() { + final List<String> inputTopics = this.options.valuesOf(inputTopicsOption); + + if (inputTopics.size() == 0) { + System.out.println("No input topics specified."); + } else { + System.out.println("Resetting offsets to zero for input topics " + inputTopics + " and all internal topics."); + } + + final Properties config = new Properties(); + config.putAll(this.consumerConfig); + config.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, this.options.valueOf(bootstrapServerOption)); + config.setProperty(ConsumerConfig.GROUP_ID_CONFIG, this.options.valueOf(applicationIdOption)); + config.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false"); + + for (final String inTopic : inputTopics) { + if (!this.allTopics.contains(inTopic)) { + System.out.println("Input topic " + inTopic + " not found. Skipping."); + } + } + + for (final String topic : this.allTopics) { + if (isInputTopic(topic) || isInternalTopic(topic)) { + System.out.println("Topic: " + topic); + + try (final KafkaConsumer<byte[], byte[]> client = new KafkaConsumer<>(config, new ByteArrayDeserializer(), new ByteArrayDeserializer())) { + client.subscribe(Collections.singleton(topic)); + client.poll(1); + + final Set<TopicPartition> partitions = client.assignment(); + client.seekToBeginning(partitions); + for (final TopicPartition p : partitions) { + client.position(p); + } + client.commitSync(); + } catch (final RuntimeException e) { + System.err.println("ERROR: Resetting offsets for topic " + topic + " failed."); + throw e; + } + } + } + + System.out.println("Done."); + } + + private boolean isInputTopic(final String topic) { + return this.options.valuesOf(inputTopicsOption).contains(topic); + } + + private void seekToEndIntermediateTopics() { + final List<String> intermediateTopics = this.options.valuesOf(intermediateTopicsOption); + + if (intermediateTopics.size() == 0) { + System.out.println("No intermediate user topics specified, skipping seek-to-end for user topic offsets."); + return; + } + + System.out.println("Seek-to-end for intermediate user topics " + intermediateTopics); + + final Properties config = new Properties(); + config.putAll(this.consumerConfig); + config.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, this.options.valueOf(bootstrapServerOption)); + config.setProperty(ConsumerConfig.GROUP_ID_CONFIG, this.options.valueOf(applicationIdOption)); + config.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false"); + + for (final String topic : intermediateTopics) { + if (this.allTopics.contains(topic)) { + System.out.println("Topic: " + topic); + + try (final KafkaConsumer<byte[], byte[]> client = new KafkaConsumer<>(config, new ByteArrayDeserializer(), new ByteArrayDeserializer())) { + client.subscribe(Collections.singleton(topic)); + client.poll(1); + + final Set<TopicPartition> partitions = client.assignment(); + client.seekToEnd(partitions); + for (final TopicPartition p : partitions) { + client.position(p); + } + client.commitSync(); + } catch (final RuntimeException e) { + System.err.println("ERROR: Seek-to-end for topic " + topic + " failed."); + throw e; + } + } else { + System.out.println("Topic " + topic + " not found. Skipping."); + } + } + + System.out.println("Done."); + } + + private void deleteInternalTopics(final ZkUtils zkUtils) { + System.out.println("Deleting all internal/auto-created topics for application " + this.options.valueOf(applicationIdOption)); + + for (final String topic : this.allTopics) { + if (isInternalTopic(topic)) { + final TopicCommand.TopicCommandOptions commandOptions = new TopicCommand.TopicCommandOptions(new String[]{ + "--zookeeper", this.options.valueOf(zookeeperOption), + "--delete", "--topic", topic}); + try { + TopicCommand.deleteTopic(zkUtils, commandOptions); + } catch (final RuntimeException e) { + System.err.println("ERROR: Deleting topic " + topic + " failed."); + throw e; + } + } + } + + System.out.println("Done."); + } + + private boolean isInternalTopic(final String topicName) { + return topicName.startsWith(this.options.valueOf(applicationIdOption) + "-") + && (topicName.endsWith("-changelog") || topicName.endsWith("-repartition")); + } + + public static void main(final String[] args) { + System.exit(new StreamsResetter().run(args)); + } + +} http://git-wip-us.apache.org/repos/asf/kafka/blob/f7976d2f/streams/src/test/java/org/apache/kafka/streams/integration/ResetIntegrationTest.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/ResetIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/ResetIntegrationTest.java index 85aff26..4d13b30 100644 --- a/streams/src/test/java/org/apache/kafka/streams/integration/ResetIntegrationTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/integration/ResetIntegrationTest.java @@ -16,6 +16,7 @@ */ package org.apache.kafka.streams.integration; +import kafka.tools.StreamsResetter; import kafka.utils.ZkUtils; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.producer.ProducerConfig; @@ -37,7 +38,6 @@ import org.apache.kafka.streams.kstream.KeyValueMapper; import org.apache.kafka.streams.kstream.TimeWindows; import org.apache.kafka.streams.kstream.Windowed; import org.apache.kafka.test.TestUtils; -import org.apache.kafka.tools.StreamsResetter; import org.junit.Assert; import org.junit.BeforeClass; import org.junit.ClassRule; http://git-wip-us.apache.org/repos/asf/kafka/blob/f7976d2f/tools/src/main/java/org/apache/kafka/tools/StreamsResetter.java ---------------------------------------------------------------------- diff --git a/tools/src/main/java/org/apache/kafka/tools/StreamsResetter.java b/tools/src/main/java/org/apache/kafka/tools/StreamsResetter.java deleted file mode 100644 index 734c15b..0000000 --- a/tools/src/main/java/org/apache/kafka/tools/StreamsResetter.java +++ /dev/null @@ -1,260 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE - * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file - * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the - * License. You may obtain a copy of the License at - * <p> - * http://www.apache.org/licenses/LICENSE-2.0 - * <p> - * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on - * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the - * specific language governing permissions and limitations under the License. - */ -package org.apache.kafka.tools; - -import joptsimple.OptionException; -import joptsimple.OptionParser; -import joptsimple.OptionSet; -import joptsimple.OptionSpec; -import kafka.admin.TopicCommand; -import kafka.utils.ZkUtils; -import org.apache.kafka.clients.consumer.ConsumerConfig; -import org.apache.kafka.clients.consumer.KafkaConsumer; -import org.apache.kafka.common.TopicPartition; -import org.apache.kafka.common.security.JaasUtils; -import org.apache.kafka.common.serialization.ByteArrayDeserializer; - -import java.io.IOException; -import java.util.Collections; -import java.util.LinkedList; -import java.util.List; -import java.util.Properties; -import java.util.Set; - -/** - * {@link StreamsResetter} resets the processing state of a Kafka Streams application so that, for example, you can reprocess its input from scratch. - * <p> - * Resetting the processing state of an application includes the following actions: - * <ol> - * <li>setting the application's consumer offsets for input and internal topics to zero</li> - * <li>skip over all intermediate user topics (i.e., "seekToEnd" for consumers of intermediate topics)</li> - * <li>deleting any topics created internally by Kafka Streams for this application</li> - * </ol> - * <p> - * Do only use this tool if <strong>no</strong> application instance is running. Otherwise, the application will get into an invalid state and crash or produce wrong results. - * <p> - * If you run multiple application instances, running this tool once is sufficient. - * However, you need to call {@code KafkaStreams#cleanUp()} before re-starting any instance (to clean local state store directory). - * Otherwise, your application is in an invalid state. - * <p> - * User output topics will not be deleted or modified by this tool. - * If downstream applications consume intermediate or output topics, it is the user's responsibility to adjust those applications manually if required. - */ -public class StreamsResetter { - private static final int EXIT_CODE_SUCCESS = 0; - private static final int EXIT_CODE_ERROR = 1; - - private static OptionSpec<String> bootstrapServerOption; - private static OptionSpec<String> zookeeperOption; - private static OptionSpec<String> applicationIdOption; - private static OptionSpec<String> inputTopicsOption; - private static OptionSpec<String> intermediateTopicsOption; - - private OptionSet options = null; - private final Properties consumerConfig = new Properties(); - private final List<String> allTopics = new LinkedList<>(); - - public int run(final String[] args) { - return run(args, new Properties()); - } - - public int run(final String[] args, final Properties config) { - this.consumerConfig.clear(); - this.consumerConfig.putAll(config); - - int exitCode = EXIT_CODE_SUCCESS; - - ZkUtils zkUtils = null; - try { - parseArguments(args); - - zkUtils = ZkUtils.apply(this.options.valueOf(zookeeperOption), - 30000, - 30000, - JaasUtils.isZkSecurityEnabled()); - - this.allTopics.clear(); - this.allTopics.addAll(scala.collection.JavaConversions.seqAsJavaList(zkUtils.getAllTopics())); - - resetInputAndInternalTopicOffsets(); - seekToEndIntermediateTopics(); - deleteInternalTopics(zkUtils); - } catch (final Exception e) { - exitCode = EXIT_CODE_ERROR; - System.err.println("ERROR: " + e.getMessage()); - } finally { - if (zkUtils != null) { - zkUtils.close(); - } - } - - return exitCode; - } - - private void parseArguments(final String[] args) throws IOException { - final OptionParser optionParser = new OptionParser(); - applicationIdOption = optionParser.accepts("application-id", "The Kafka Streams application ID (application.id)") - .withRequiredArg() - .ofType(String.class) - .describedAs("id") - .required(); - bootstrapServerOption = optionParser.accepts("bootstrap-servers", "Comma-separated list of broker urls with format: HOST1:PORT1,HOST2:PORT2") - .withRequiredArg() - .ofType(String.class) - .defaultsTo("localhost:9092") - .describedAs("urls"); - zookeeperOption = optionParser.accepts("zookeeper", "Format: HOST:POST") - .withRequiredArg() - .ofType(String.class) - .defaultsTo("localhost:2181") - .describedAs("url"); - inputTopicsOption = optionParser.accepts("input-topics", "Comma-separated list of user input topics") - .withRequiredArg() - .ofType(String.class) - .withValuesSeparatedBy(',') - .describedAs("list"); - intermediateTopicsOption = optionParser.accepts("intermediate-topics", "Comma-separated list of intermediate user topics") - .withRequiredArg() - .ofType(String.class) - .withValuesSeparatedBy(',') - .describedAs("list"); - - try { - this.options = optionParser.parse(args); - } catch (final OptionException e) { - optionParser.printHelpOn(System.err); - throw e; - } - } - - private void resetInputAndInternalTopicOffsets() { - final List<String> inputTopics = this.options.valuesOf(inputTopicsOption); - - if (inputTopics.size() == 0) { - System.out.println("No input topics specified."); - } else { - System.out.println("Resetting offsets to zero for input topics " + inputTopics + " and all internal topics."); - } - - final Properties config = new Properties(); - config.putAll(this.consumerConfig); - config.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, this.options.valueOf(bootstrapServerOption)); - config.setProperty(ConsumerConfig.GROUP_ID_CONFIG, this.options.valueOf(applicationIdOption)); - config.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false"); - - for (final String inTopic : inputTopics) { - if (!this.allTopics.contains(inTopic)) { - System.out.println("Input topic " + inTopic + " not found. Skipping."); - } - } - - for (final String topic : this.allTopics) { - if (isInputTopic(topic) || isInternalTopic(topic)) { - System.out.println("Topic: " + topic); - - try (final KafkaConsumer<byte[], byte[]> client = new KafkaConsumer<>(config, new ByteArrayDeserializer(), new ByteArrayDeserializer())) { - client.subscribe(Collections.singleton(topic)); - client.poll(1); - - final Set<TopicPartition> partitions = client.assignment(); - client.seekToBeginning(partitions); - for (final TopicPartition p : partitions) { - client.position(p); - } - client.commitSync(); - } catch (final RuntimeException e) { - System.err.println("ERROR: Resetting offsets for topic " + topic + " failed."); - throw e; - } - } - } - - System.out.println("Done."); - } - - private boolean isInputTopic(final String topic) { - return this.options.valuesOf(inputTopicsOption).contains(topic); - } - - private void seekToEndIntermediateTopics() { - final List<String> intermediateTopics = this.options.valuesOf(intermediateTopicsOption); - - if (intermediateTopics.size() == 0) { - System.out.println("No intermediate user topics specified, skipping seek-to-end for user topic offsets."); - return; - } - - System.out.println("Seek-to-end for intermediate user topics " + intermediateTopics); - - final Properties config = new Properties(); - config.putAll(this.consumerConfig); - config.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, this.options.valueOf(bootstrapServerOption)); - config.setProperty(ConsumerConfig.GROUP_ID_CONFIG, this.options.valueOf(applicationIdOption)); - config.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false"); - - for (final String topic : intermediateTopics) { - if (this.allTopics.contains(topic)) { - System.out.println("Topic: " + topic); - - try (final KafkaConsumer<byte[], byte[]> client = new KafkaConsumer<>(config, new ByteArrayDeserializer(), new ByteArrayDeserializer())) { - client.subscribe(Collections.singleton(topic)); - client.poll(1); - - final Set<TopicPartition> partitions = client.assignment(); - client.seekToEnd(partitions); - for (final TopicPartition p : partitions) { - client.position(p); - } - client.commitSync(); - } catch (final RuntimeException e) { - System.err.println("ERROR: Seek-to-end for topic " + topic + " failed."); - throw e; - } - } else { - System.out.println("Topic " + topic + " not found. Skipping."); - } - } - - System.out.println("Done."); - } - - private void deleteInternalTopics(final ZkUtils zkUtils) { - System.out.println("Deleting all internal/auto-created topics for application " + this.options.valueOf(applicationIdOption)); - - for (final String topic : this.allTopics) { - if (isInternalTopic(topic)) { - final TopicCommand.TopicCommandOptions commandOptions = new TopicCommand.TopicCommandOptions(new String[]{ - "--zookeeper", this.options.valueOf(zookeeperOption), - "--delete", "--topic", topic}); - try { - TopicCommand.deleteTopic(zkUtils, commandOptions); - } catch (final RuntimeException e) { - System.err.println("ERROR: Deleting topic " + topic + " failed."); - throw e; - } - } - } - - System.out.println("Done."); - } - - private boolean isInternalTopic(final String topicName) { - return topicName.startsWith(this.options.valueOf(applicationIdOption) + "-") - && (topicName.endsWith("-changelog") || topicName.endsWith("-repartition")); - } - - public static void main(final String[] args) { - System.exit(new StreamsResetter().run(args)); - } - -}
