[ 
https://issues.apache.org/jira/browse/KAFKA-6318?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16308490#comment-16308490
 ] 

ASF GitHub Bot commented on KAFKA-6318:
---------------------------------------

guozhangwang closed pull request #4305: KAFKA-6318: StreamsResetter should 
return non-zero return code on error
URL: https://github.com/apache/kafka/pull/4305
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/core/src/main/scala/kafka/tools/StreamsResetter.java 
b/core/src/main/scala/kafka/tools/StreamsResetter.java
index 4851a948399..4496876e09a 100644
--- a/core/src/main/scala/kafka/tools/StreamsResetter.java
+++ b/core/src/main/scala/kafka/tools/StreamsResetter.java
@@ -130,7 +130,7 @@ public int run(final String[] args,
 
             final HashMap<Object, Object> consumerConfig = new 
HashMap<>(config);
             consumerConfig.putAll(properties);
-            
maybeResetInputAndSeekToEndIntermediateTopicOffsets(consumerConfig, dryRun);
+            exitCode = 
maybeResetInputAndSeekToEndIntermediateTopicOffsets(consumerConfig, dryRun);
             maybeDeleteInternalTopics(kafkaAdminClient, dryRun);
 
         } catch (final Throwable e) {
@@ -237,9 +237,10 @@ private void parseArguments(final String[] args) throws 
IOException {
         CommandLineUtils.checkInvalidArgs(optionParser, options, 
shiftByOption, allScenarioOptions.$minus(shiftByOption));
     }
 
-    private void maybeResetInputAndSeekToEndIntermediateTopicOffsets(final Map 
consumerConfig, final boolean dryRun) throws Exception {
+    private int maybeResetInputAndSeekToEndIntermediateTopicOffsets(final Map 
consumerConfig, final boolean dryRun) throws Exception {
         final List<String> inputTopics = options.valuesOf(inputTopicsOption);
         final List<String> intermediateTopics = 
options.valuesOf(intermediateTopicsOption);
+        int topicNotFound = EXIT_CODE_SUCCESS;
 
         final List<String> notFoundInputTopics = new ArrayList<>();
         final List<String> notFoundIntermediateTopics = new ArrayList<>();
@@ -248,7 +249,7 @@ private void 
maybeResetInputAndSeekToEndIntermediateTopicOffsets(final Map consu
 
         if (inputTopics.size() == 0 && intermediateTopics.size() == 0) {
             System.out.println("No input or intermediate topics specified. 
Skipping seek.");
-            return;
+            return EXIT_CODE_SUCCESS;
         }
 
         if (inputTopics.size() != 0) {
@@ -316,6 +317,7 @@ private void 
maybeResetInputAndSeekToEndIntermediateTopicOffsets(final Map consu
                 for (final String topic : notFoundInputTopics) {
                     System.out.println("Topic: " + topic);
                 }
+                topicNotFound = EXIT_CODE_ERROR;
             }
 
             if (notFoundIntermediateTopics.size() > 0) {
@@ -330,6 +332,7 @@ private void 
maybeResetInputAndSeekToEndIntermediateTopicOffsets(final Map consu
             throw e;
         }
         System.out.println("Done.");
+        return topicNotFound;
     }
 
     // visible for testing
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/integration/AbstractResetIntegrationTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/integration/AbstractResetIntegrationTest.java
index 9131007b6bc..26673ca1926 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/integration/AbstractResetIntegrationTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/integration/AbstractResetIntegrationTest.java
@@ -73,6 +73,7 @@
     private static final String OUTPUT_TOPIC_2 = "outputTopic2";
     private static final String OUTPUT_TOPIC_2_RERUN = "outputTopic2_rerun";
     private static final String INTERMEDIATE_USER_TOPIC = "userTopic";
+    private static final String NON_EXISTING_TOPIC = "nonExistingTopic2";
 
     private static final long STREAMS_CONSUMER_TIMEOUT = 2000L;
     private static final long CLEANUP_CONSUMER_TIMEOUT = 2000L;
@@ -612,6 +613,30 @@ private void cleanGlobal(final Properties sslConfig,
         Assert.assertEquals(0, exitCode);
     }
 
+    void shouldNotAllowToResetWhileStreamsIsRunning() throws Exception {
+
+        final Properties streamsConfiguration = prepareTest();
+        final List<String> parameterList = new ArrayList<>(
+                Arrays.asList("--application-id", APP_ID + testNo,
+                        "--bootstrap-servers", bootstrapServers,
+                        "--input-topics", NON_EXISTING_TOPIC));
+
+        final String[] parameters = parameterList.toArray(new 
String[parameterList.size()]);
+        final Properties cleanUpConfig = new Properties();
+        cleanUpConfig.put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, 100);
+        cleanUpConfig.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "" + 
CLEANUP_CONSUMER_TIMEOUT);
+
+        // RUN
+        KafkaStreams streams = new 
KafkaStreams(setupTopologyWithoutIntermediateUserTopic(), streamsConfiguration);
+        streams.start();
+
+        final int exitCode = new StreamsResetter().run(parameters, 
cleanUpConfig);
+        Assert.assertEquals(1, exitCode);
+
+        streams.close();
+
+    }
+
     private void assertInternalTopicsGotDeleted(final String 
intermediateUserTopic) throws Exception {
         // do not use list topics request, but read from the embedded 
cluster's zookeeper path directly to confirm
         if (intermediateUserTopic != null) {
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/integration/ResetIntegrationTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/integration/ResetIntegrationTest.java
index d781d95c9c6..ef9a67dce75 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/integration/ResetIntegrationTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/integration/ResetIntegrationTest.java
@@ -18,14 +18,21 @@
 
 import kafka.server.KafkaConfig$;
 import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
 import org.apache.kafka.test.IntegrationTest;
 import org.junit.AfterClass;
+import org.junit.Assert;
 import org.junit.Before;
 import org.junit.ClassRule;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
+import kafka.tools.StreamsResetter;
 
 import java.util.Properties;
+import java.util.Arrays;
+import java.util.ArrayList;
+import java.util.List;
+
 
 /**
  * Tests local state store and global application cleanup.
@@ -35,6 +42,10 @@
 
     @ClassRule
     public static final EmbeddedKafkaCluster CLUSTER;
+    private static final long CLEANUP_CONSUMER_TIMEOUT = 2000L;
+    private static final String APP_ID = "Integration-test";
+    private static final String NON_EXISTING_TOPIC = "nonExistingTopic";
+    private static int testNo = 1;
 
     static {
         final Properties props = new Properties();
@@ -81,4 +92,44 @@ public void 
testReprocessingFromDateTimeAfterResetWithoutIntermediateUserTopic()
     public void 
testReprocessingByDurationAfterResetWithoutIntermediateUserTopic() throws 
Exception {
         
super.testReprocessingByDurationAfterResetWithoutIntermediateUserTopic();
     }
+
+    @Test
+    public void shouldNotAllowToResetWhileStreamsRunning() throws Exception {
+        super.shouldNotAllowToResetWhileStreamsIsRunning();
+    }
+
+    @Test
+    public void shouldNotAllowToResetWhenInputTopicAbsent() throws Exception {
+
+        final List<String> parameterList = new ArrayList<>(
+                Arrays.asList("--application-id", APP_ID + testNo,
+                        "--bootstrap-servers", bootstrapServers,
+                        "--input-topics", NON_EXISTING_TOPIC));
+
+        final String[] parameters = parameterList.toArray(new 
String[parameterList.size()]);
+        final Properties cleanUpConfig = new Properties();
+        cleanUpConfig.put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, 100);
+        cleanUpConfig.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "" + 
CLEANUP_CONSUMER_TIMEOUT);
+
+        final int exitCode = new StreamsResetter().run(parameters, 
cleanUpConfig);
+        Assert.assertEquals(1, exitCode);
+    }
+
+    @Test
+    public void shouldNotAllowToResetWhenIntermediateTopicAbsent() throws 
Exception {
+
+        final List<String> parameterList = new ArrayList<>(
+                Arrays.asList("--application-id", APP_ID + testNo,
+                        "--bootstrap-servers", bootstrapServers,
+                        "--intermediate-topics", NON_EXISTING_TOPIC));
+
+        final String[] parameters = parameterList.toArray(new 
String[parameterList.size()]);
+        final Properties cleanUpConfig = new Properties();
+        cleanUpConfig.put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, 100);
+        cleanUpConfig.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "" + 
CLEANUP_CONSUMER_TIMEOUT);
+
+        final int exitCode = new StreamsResetter().run(parameters, 
cleanUpConfig);
+        Assert.assertEquals(1, exitCode);
+    }
+
 }


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


> StreamsResetter should return non-zero return code on error
> -----------------------------------------------------------
>
>                 Key: KAFKA-6318
>                 URL: https://issues.apache.org/jira/browse/KAFKA-6318
>             Project: Kafka
>          Issue Type: Bug
>          Components: streams, tools
>    Affects Versions: 1.0.0
>            Reporter: Matthias J. Sax
>            Assignee: siva santhalingam
>
> If users specify a non-existing topic as input parameter,  
> {{StreamsResetter}} only prints out an error message that the topic was not 
> found, but return code is still zero. We should return a non-zero return code 
> for this case.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

Reply via email to