[
https://issues.apache.org/jira/browse/KAFKA-6318?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16308490#comment-16308490
]
ASF GitHub Bot commented on KAFKA-6318:
---------------------------------------
guozhangwang closed pull request #4305: KAFKA-6318: StreamsResetter should
return non-zero return code on error
URL: https://github.com/apache/kafka/pull/4305
This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:
As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):
diff --git a/core/src/main/scala/kafka/tools/StreamsResetter.java
b/core/src/main/scala/kafka/tools/StreamsResetter.java
index 4851a948399..4496876e09a 100644
--- a/core/src/main/scala/kafka/tools/StreamsResetter.java
+++ b/core/src/main/scala/kafka/tools/StreamsResetter.java
@@ -130,7 +130,7 @@ public int run(final String[] args,
final HashMap<Object, Object> consumerConfig = new
HashMap<>(config);
consumerConfig.putAll(properties);
-
maybeResetInputAndSeekToEndIntermediateTopicOffsets(consumerConfig, dryRun);
+ exitCode =
maybeResetInputAndSeekToEndIntermediateTopicOffsets(consumerConfig, dryRun);
maybeDeleteInternalTopics(kafkaAdminClient, dryRun);
} catch (final Throwable e) {
@@ -237,9 +237,10 @@ private void parseArguments(final String[] args) throws
IOException {
CommandLineUtils.checkInvalidArgs(optionParser, options,
shiftByOption, allScenarioOptions.$minus(shiftByOption));
}
- private void maybeResetInputAndSeekToEndIntermediateTopicOffsets(final Map
consumerConfig, final boolean dryRun) throws Exception {
+ private int maybeResetInputAndSeekToEndIntermediateTopicOffsets(final Map
consumerConfig, final boolean dryRun) throws Exception {
final List<String> inputTopics = options.valuesOf(inputTopicsOption);
final List<String> intermediateTopics =
options.valuesOf(intermediateTopicsOption);
+ int topicNotFound = EXIT_CODE_SUCCESS;
final List<String> notFoundInputTopics = new ArrayList<>();
final List<String> notFoundIntermediateTopics = new ArrayList<>();
@@ -248,7 +249,7 @@ private void
maybeResetInputAndSeekToEndIntermediateTopicOffsets(final Map consu
if (inputTopics.size() == 0 && intermediateTopics.size() == 0) {
System.out.println("No input or intermediate topics specified.
Skipping seek.");
- return;
+ return EXIT_CODE_SUCCESS;
}
if (inputTopics.size() != 0) {
@@ -316,6 +317,7 @@ private void
maybeResetInputAndSeekToEndIntermediateTopicOffsets(final Map consu
for (final String topic : notFoundInputTopics) {
System.out.println("Topic: " + topic);
}
+ topicNotFound = EXIT_CODE_ERROR;
}
if (notFoundIntermediateTopics.size() > 0) {
@@ -330,6 +332,7 @@ private void
maybeResetInputAndSeekToEndIntermediateTopicOffsets(final Map consu
throw e;
}
System.out.println("Done.");
+ return topicNotFound;
}
// visible for testing
diff --git
a/streams/src/test/java/org/apache/kafka/streams/integration/AbstractResetIntegrationTest.java
b/streams/src/test/java/org/apache/kafka/streams/integration/AbstractResetIntegrationTest.java
index 9131007b6bc..26673ca1926 100644
---
a/streams/src/test/java/org/apache/kafka/streams/integration/AbstractResetIntegrationTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/integration/AbstractResetIntegrationTest.java
@@ -73,6 +73,7 @@
private static final String OUTPUT_TOPIC_2 = "outputTopic2";
private static final String OUTPUT_TOPIC_2_RERUN = "outputTopic2_rerun";
private static final String INTERMEDIATE_USER_TOPIC = "userTopic";
+ private static final String NON_EXISTING_TOPIC = "nonExistingTopic2";
private static final long STREAMS_CONSUMER_TIMEOUT = 2000L;
private static final long CLEANUP_CONSUMER_TIMEOUT = 2000L;
@@ -612,6 +613,30 @@ private void cleanGlobal(final Properties sslConfig,
Assert.assertEquals(0, exitCode);
}
+ void shouldNotAllowToResetWhileStreamsIsRunning() throws Exception {
+
+ final Properties streamsConfiguration = prepareTest();
+ final List<String> parameterList = new ArrayList<>(
+ Arrays.asList("--application-id", APP_ID + testNo,
+ "--bootstrap-servers", bootstrapServers,
+ "--input-topics", NON_EXISTING_TOPIC));
+
+ final String[] parameters = parameterList.toArray(new
String[parameterList.size()]);
+ final Properties cleanUpConfig = new Properties();
+ cleanUpConfig.put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, 100);
+ cleanUpConfig.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "" +
CLEANUP_CONSUMER_TIMEOUT);
+
+ // RUN
+ KafkaStreams streams = new
KafkaStreams(setupTopologyWithoutIntermediateUserTopic(), streamsConfiguration);
+ streams.start();
+
+ final int exitCode = new StreamsResetter().run(parameters,
cleanUpConfig);
+ Assert.assertEquals(1, exitCode);
+
+ streams.close();
+
+ }
+
private void assertInternalTopicsGotDeleted(final String
intermediateUserTopic) throws Exception {
// do not use list topics request, but read from the embedded
cluster's zookeeper path directly to confirm
if (intermediateUserTopic != null) {
diff --git
a/streams/src/test/java/org/apache/kafka/streams/integration/ResetIntegrationTest.java
b/streams/src/test/java/org/apache/kafka/streams/integration/ResetIntegrationTest.java
index d781d95c9c6..ef9a67dce75 100644
---
a/streams/src/test/java/org/apache/kafka/streams/integration/ResetIntegrationTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/integration/ResetIntegrationTest.java
@@ -18,14 +18,21 @@
import kafka.server.KafkaConfig$;
import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.test.IntegrationTest;
import org.junit.AfterClass;
+import org.junit.Assert;
import org.junit.Before;
import org.junit.ClassRule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
+import kafka.tools.StreamsResetter;
import java.util.Properties;
+import java.util.Arrays;
+import java.util.ArrayList;
+import java.util.List;
+
/**
* Tests local state store and global application cleanup.
@@ -35,6 +42,10 @@
@ClassRule
public static final EmbeddedKafkaCluster CLUSTER;
+ private static final long CLEANUP_CONSUMER_TIMEOUT = 2000L;
+ private static final String APP_ID = "Integration-test";
+ private static final String NON_EXISTING_TOPIC = "nonExistingTopic";
+ private static int testNo = 1;
static {
final Properties props = new Properties();
@@ -81,4 +92,44 @@ public void
testReprocessingFromDateTimeAfterResetWithoutIntermediateUserTopic()
public void
testReprocessingByDurationAfterResetWithoutIntermediateUserTopic() throws
Exception {
super.testReprocessingByDurationAfterResetWithoutIntermediateUserTopic();
}
+
+ @Test
+ public void shouldNotAllowToResetWhileStreamsRunning() throws Exception {
+ super.shouldNotAllowToResetWhileStreamsIsRunning();
+ }
+
+ @Test
+ public void shouldNotAllowToResetWhenInputTopicAbsent() throws Exception {
+
+ final List<String> parameterList = new ArrayList<>(
+ Arrays.asList("--application-id", APP_ID + testNo,
+ "--bootstrap-servers", bootstrapServers,
+ "--input-topics", NON_EXISTING_TOPIC));
+
+ final String[] parameters = parameterList.toArray(new
String[parameterList.size()]);
+ final Properties cleanUpConfig = new Properties();
+ cleanUpConfig.put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, 100);
+ cleanUpConfig.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "" +
CLEANUP_CONSUMER_TIMEOUT);
+
+ final int exitCode = new StreamsResetter().run(parameters,
cleanUpConfig);
+ Assert.assertEquals(1, exitCode);
+ }
+
+ @Test
+ public void shouldNotAllowToResetWhenIntermediateTopicAbsent() throws
Exception {
+
+ final List<String> parameterList = new ArrayList<>(
+ Arrays.asList("--application-id", APP_ID + testNo,
+ "--bootstrap-servers", bootstrapServers,
+ "--intermediate-topics", NON_EXISTING_TOPIC));
+
+ final String[] parameters = parameterList.toArray(new
String[parameterList.size()]);
+ final Properties cleanUpConfig = new Properties();
+ cleanUpConfig.put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, 100);
+ cleanUpConfig.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "" +
CLEANUP_CONSUMER_TIMEOUT);
+
+ final int exitCode = new StreamsResetter().run(parameters,
cleanUpConfig);
+ Assert.assertEquals(1, exitCode);
+ }
+
}
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
> StreamsResetter should return non-zero return code on error
> -----------------------------------------------------------
>
> Key: KAFKA-6318
> URL: https://issues.apache.org/jira/browse/KAFKA-6318
> Project: Kafka
> Issue Type: Bug
> Components: streams, tools
> Affects Versions: 1.0.0
> Reporter: Matthias J. Sax
> Assignee: siva santhalingam
>
> If users specify a non-existing topic as input parameter,
> {{StreamsResetter}} only prints out an error message that the topic was not
> found, but return code is still zero. We should return a non-zero return code
> for this case.
--
This message was sent by Atlassian JIRA
(v6.4.14#64029)