This is an automated email from the ASF dual-hosted git repository.
guozhang pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new c238af2 MINOR: Remove throwing exception if not found from describe
topics (#6112)
c238af2 is described below
commit c238af29bf50cade5aa10c1bb2678ad01cfbbf47
Author: Bill Bejeck <[email protected]>
AuthorDate: Thu Jan 10 13:03:11 2019 -0500
MINOR: Remove throwing exception if not found from describe topics (#6112)
We recently improved the handling of the InternalTopicManager retries with
#6085. The AdminClient will throw an InvalidTopicException if the topic is not
found. We need to ignore that exception as when calling AdminClient#describe we
may not have had a chance to create the topic yet, especially with the case of
internal topics
I've created a new test asserting that when an InvalidTopicException is
thrown when the topic is not found we continue on.
Reviewers: John Roesler <[email protected]>, Guozhang Wang
<[email protected]>
---
.../processor/internals/InternalTopicManager.java | 7 +++--
.../internals/InternalTopicManagerTest.java | 31 ++++++++++++++++++++++
2 files changed, 36 insertions(+), 2 deletions(-)
diff --git
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopicManager.java
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopicManager.java
index 40c25d1..ee7fd3e 100644
---
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopicManager.java
+++
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopicManager.java
@@ -23,6 +23,7 @@ import org.apache.kafka.clients.admin.DescribeTopicsResult;
import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.clients.admin.TopicDescription;
import org.apache.kafka.common.KafkaFuture;
+import org.apache.kafka.common.errors.InvalidTopicException;
import org.apache.kafka.common.errors.LeaderNotAvailableException;
import org.apache.kafka.common.errors.TopicExistsException;
import org.apache.kafka.common.errors.UnknownTopicOrPartitionException;
@@ -203,9 +204,11 @@ public class InternalTopicManager {
} catch (final ExecutionException couldNotDescribeTopicException) {
final Throwable cause =
couldNotDescribeTopicException.getCause();
if (cause instanceof UnknownTopicOrPartitionException ||
- cause instanceof LeaderNotAvailableException) {
+ cause instanceof LeaderNotAvailableException ||
+ (cause instanceof InvalidTopicException &&
+ cause.getMessage().equals("Topic " + topicName + " not
found."))) {
// This topic didn't exist or leader is not known yet,
proceed to try to create it
- log.debug("Topic {} is unknown, hence not existed yet.",
topicName);
+ log.debug("Topic {} is unknown or not found, hence not
existed yet.", topicName);
} else {
log.error("Unexpected error during topic description for
{}.\n" +
"Error message was: {}", topicName, cause.toString());
diff --git
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/InternalTopicManagerTest.java
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/InternalTopicManagerTest.java
index e91bf32..074228a 100644
---
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/InternalTopicManagerTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/InternalTopicManagerTest.java
@@ -28,6 +28,7 @@ import org.apache.kafka.common.errors.TimeoutException;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.errors.StreamsException;
+import
org.apache.kafka.streams.processor.internals.testutil.LogCaptureAppender;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
@@ -193,6 +194,36 @@ public class InternalTopicManagerTest {
}
@Test
+ public void shouldLogWhenTopicNotFoundAndNotThrowException() {
+ LogCaptureAppender.setClassLoggerToDebug(InternalTopicManager.class);
+ final LogCaptureAppender appender =
LogCaptureAppender.createAndRegister();
+ mockAdminClient.addTopic(
+ false,
+ topic,
+ Collections.singletonList(new TopicPartitionInfo(0, broker1,
cluster, Collections.emptyList())),
+ null);
+
+ final InternalTopicConfig internalTopicConfig = new
RepartitionTopicConfig(topic, Collections.emptyMap());
+ internalTopicConfig.setNumberOfPartitions(1);
+
+ final InternalTopicConfig internalTopicConfigII = new
RepartitionTopicConfig("internal-topic", Collections.emptyMap());
+ internalTopicConfigII.setNumberOfPartitions(1);
+
+ final Map<String, InternalTopicConfig> topicConfigMap = new
HashMap<>();
+ topicConfigMap.put(topic, internalTopicConfig);
+ topicConfigMap.put("internal-topic", internalTopicConfigII);
+
+
+ internalTopicManager.makeReady(topicConfigMap);
+ boolean foundExpectedMessage = false;
+ for (final String message : appender.getMessages()) {
+ foundExpectedMessage |= message.contains("Topic internal-topic is
unknown or not found, hence not existed yet.");
+ }
+ assertTrue(foundExpectedMessage);
+
+ }
+
+ @Test
public void shouldExhaustRetriesOnMarkedForDeletionTopic() {
mockAdminClient.addTopic(
false,