Repository: kafka Updated Branches: refs/heads/trunk 6cae5ec66 -> 3ab0456db
KAFKA-5525: Streams reset tool should have same console output with or without dry-run Fixed console output to be consistent with/without dry-run option Author: ppatierno <[email protected]> Reviewers: Matthias J. Sax <[email protected]>, Guozhang Wang <[email protected]> Closes #3443 from ppatierno/kafka-5525 Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/3ab0456d Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/3ab0456d Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/3ab0456d Branch: refs/heads/trunk Commit: 3ab0456db839d8959b2846b9a7e4cc28b4c822c7 Parents: 6cae5ec Author: Paolo Patierno <[email protected]> Authored: Wed Jul 5 15:05:05 2017 -0700 Committer: Guozhang Wang <[email protected]> Committed: Wed Jul 5 15:05:05 2017 -0700 ---------------------------------------------------------------------- .../main/scala/kafka/tools/StreamsResetter.java | 41 +++++++++----------- 1 file changed, 18 insertions(+), 23 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kafka/blob/3ab0456d/core/src/main/scala/kafka/tools/StreamsResetter.java ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/tools/StreamsResetter.java b/core/src/main/scala/kafka/tools/StreamsResetter.java index d2c5e14..55989f4 100644 --- a/core/src/main/scala/kafka/tools/StreamsResetter.java +++ b/core/src/main/scala/kafka/tools/StreamsResetter.java @@ -189,13 +189,11 @@ public class StreamsResetter { return; } - if (!dryRun) { - if (inputTopics.size() != 0) { - System.out.println("Seek-to-beginning for input topics " + inputTopics); - } - if (intermediateTopics.size() != 0) { - System.out.println("Seek-to-end for intermediate topics " + intermediateTopics); - } + if (inputTopics.size() != 0) { + System.out.println("Seek-to-beginning for input topics " + inputTopics); + } + if (intermediateTopics.size() != 0) { + System.out.println("Seek-to-end for intermediate topics " + intermediateTopics); } final Set<String> topicsToSubscribe = new HashSet<>(inputTopics.size() + intermediateTopics.size()); @@ -278,18 +276,16 @@ public class StreamsResetter { final List<String> intermediateTopics = options.valuesOf(intermediateTopicsOption); if (intermediateTopicPartitions.size() > 0) { + System.out.println("Following intermediate topics offsets will be reset to end (for consumer group " + groupId + ")"); + for (final String topic : intermediateTopics) { + if (allTopics.contains(topic)) { + System.out.println("Topic: " + topic); + } + } if (!dryRun) { client.seekToEnd(intermediateTopicPartitions); - } else { - System.out.println("Following intermediate topics offsets will be reset to end (for consumer group " + groupId + ")"); - for (final String topic : intermediateTopics) { - if (allTopics.contains(topic)) { - System.out.println("Topic: " + topic); - } - } } } - } private void maybeSeekToBeginning(final KafkaConsumer<byte[], byte[]> client, @@ -299,15 +295,14 @@ public class StreamsResetter { final String groupId = options.valueOf(applicationIdOption); if (inputTopicPartitions.size() > 0) { + System.out.println("Following input topics offsets will be reset to beginning (for consumer group " + groupId + ")"); + for (final String topic : inputTopics) { + if (allTopics.contains(topic)) { + System.out.println("Topic: " + topic); + } + } if (!dryRun) { client.seekToBeginning(inputTopicPartitions); - } else { - System.out.println("Following input topics offsets will be reset to beginning (for consumer group " + groupId + ")"); - for (final String topic : inputTopics) { - if (allTopics.contains(topic)) { - System.out.println("Topic: " + topic); - } - } } } } @@ -350,7 +345,7 @@ public class StreamsResetter { } private void printHelp(OptionParser parser) throws IOException { - System.err.println("The Application Reset Tool allows you to quickly reset an application in order to reprocess " + System.err.println("The Streams Reset Tool allows you to quickly reset an application in order to reprocess " + "its data from scratch.\n" + "* This tool resets offsets of input topics to the earliest available offset and it skips to the end of " + "intermediate topics (topics used in the through() method).\n"
