showuon commented on a change in pull request #8712:
URL: https://github.com/apache/kafka/pull/8712#discussion_r445601207
##########
File path:
streams/src/test/java/org/apache/kafka/streams/processor/internals/InternalTopicManagerTest.java
##########
@@ -108,7 +111,8 @@ public void shouldReturnCorrectPartitionCounts() {
topic,
Collections.singletonList(new TopicPartitionInfo(0, broker1,
singleReplica, Collections.emptyList())),
null);
- assertEquals(Collections.singletonMap(topic, 1),
internalTopicManager.getNumPartitions(Collections.singleton(topic)));
+ assertEquals(Collections.singletonMap(topic, 1),
+
internalTopicManager.getNumPartitions(Collections.singleton(topic), new
HashSet<String>(), 1));
Review comment:
Updated. Thanks.
##########
File path:
streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopicManager.java
##########
@@ -218,13 +231,16 @@ public InternalTopicManager(final Admin adminClient,
final StreamsConfig streams
/**
* Check the existing topics to have correct number of partitions; and
return the remaining topics that needs to be created
*/
- private Set<String> validateTopics(final Set<String> topicsToValidate,
final Map<String, InternalTopicConfig> topicsMap) {
+ private Set<String> validateTopics(final Set<String> topicsToValidate,
+ final Map<String, InternalTopicConfig>
topicsMap,
+ final HashSet<String> tempUnknownTopics,
Review comment:
Fixed.
##########
File path:
streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopicManager.java
##########
@@ -179,31 +180,43 @@ public InternalTopicManager(final Admin adminClient,
final StreamsConfig streams
* Topics that were not able to get its description will simply not be
returned
*/
// visible for testing
- protected Map<String, Integer> getNumPartitions(final Set<String> topics) {
- log.debug("Trying to check if topics {} have been created with
expected number of partitions.", topics);
-
- final DescribeTopicsResult describeTopicsResult =
adminClient.describeTopics(topics);
+ protected Map<String, Integer> getNumPartitions(final Set<String> topics,
+ final HashSet<String>
tempUnknownTopics,
+ final int
remainingRetries) {
+ final Set<String> allTopicsToDescribe = new HashSet<>(topics);
+ allTopicsToDescribe.addAll(tempUnknownTopics);
+ log.debug("Trying to check if topics {} have been created with
expected number of partitions.", allTopicsToDescribe);
+
+ final DescribeTopicsResult describeTopicsResult =
adminClient.describeTopics(allTopicsToDescribe);
final Map<String, KafkaFuture<TopicDescription>> futures =
describeTopicsResult.values();
final Map<String, Integer> existedTopicPartition = new HashMap<>();
for (final Map.Entry<String, KafkaFuture<TopicDescription>>
topicFuture : futures.entrySet()) {
final String topicName = topicFuture.getKey();
try {
final TopicDescription topicDescription =
topicFuture.getValue().get();
- existedTopicPartition.put(
- topicFuture.getKey(),
- topicDescription.partitions().size());
+ existedTopicPartition.put(topicName,
topicDescription.partitions().size());
+ tempUnknownTopics.remove(topicName);
} catch (final InterruptedException fatalException) {
// this should not happen; if it ever happens it indicate a bug
Thread.currentThread().interrupt();
log.error(INTERRUPTED_ERROR_MESSAGE, fatalException);
throw new IllegalStateException(INTERRUPTED_ERROR_MESSAGE,
fatalException);
} catch (final ExecutionException couldNotDescribeTopicException) {
final Throwable cause =
couldNotDescribeTopicException.getCause();
- if (cause instanceof UnknownTopicOrPartitionException ||
- cause instanceof LeaderNotAvailableException) {
- // This topic didn't exist or leader is not known yet,
proceed to try to create it
- log.debug("Topic {} is unknown or not found, hence not
existed yet: {}", topicName, cause.toString());
+ if (cause instanceof UnknownTopicOrPartitionException) {
+ // This topic didn't exist, proceed to try to create it
+ log.debug("Topic {} is unknown or not found, hence not
existed yet.\n" +
+ "Error message was: {}", topicName, cause.toString());
+ } else if (cause instanceof LeaderNotAvailableException) {
+ tempUnknownTopics.add(topicName);
+ if (remainingRetries > 0) {
Review comment:
Updated.
##########
File path:
streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopicManager.java
##########
@@ -179,31 +180,43 @@ public InternalTopicManager(final Admin adminClient,
final StreamsConfig streams
* Topics that were not able to get its description will simply not be
returned
*/
// visible for testing
- protected Map<String, Integer> getNumPartitions(final Set<String> topics) {
- log.debug("Trying to check if topics {} have been created with
expected number of partitions.", topics);
-
- final DescribeTopicsResult describeTopicsResult =
adminClient.describeTopics(topics);
+ protected Map<String, Integer> getNumPartitions(final Set<String> topics,
+ final HashSet<String>
tempUnknownTopics,
+ final int
remainingRetries) {
Review comment:
Updated.
##########
File path:
streams/src/test/java/org/apache/kafka/streams/processor/internals/InternalTopicManagerTest.java
##########
@@ -287,12 +291,41 @@ public void
shouldLogWhenTopicNotFoundAndNotThrowException() {
assertThat(
appender.getMessages(),
- hasItem("stream-thread [" + threadName + "] Topic
internal-topic is unknown or not found, hence not existed yet:" +
- "
org.apache.kafka.common.errors.UnknownTopicOrPartitionException: Topic
internal-topic not found.")
+ hasItem("stream-thread [" + threadName + "] Topic
internal-topic is unknown or not found, hence not existed yet.\n" +
+ "Error message was:
org.apache.kafka.common.errors.UnknownTopicOrPartitionException: Topic
internal-topic not found.")
);
}
}
+ @Test
+ public void shouldLogWhenTopicLeaderNotAvailableAndThrowException() {
+ final String leaderNotAvailableTopic = "LeaderNotAvailableTopic";
+ final AdminClient admin = EasyMock.createNiceMock(AdminClient.class);
+ final InternalTopicManager topicManager = new
InternalTopicManager(admin, new StreamsConfig(config));
+
+ final KafkaFutureImpl<TopicDescription> topicDescriptionFailFuture =
new KafkaFutureImpl<>();
+ topicDescriptionFailFuture.completeExceptionally(new
LeaderNotAvailableException("Leader Not Available!"));
+
+ // simulate describeTopics got LeaderNotAvailableException
+
EasyMock.expect(admin.describeTopics(Collections.singleton(leaderNotAvailableTopic)))
+ .andReturn(new MockDescribeTopicsResult(
Review comment:
Nice catch! Fixed.
##########
File path:
streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopicManager.java
##########
@@ -243,10 +259,18 @@ public InternalTopicManager(final Admin adminClient,
final StreamsConfig streams
throw new StreamsException(errorMsg);
}
} else {
- topicsToCreate.add(topicName);
+ // for the tempUnknownTopics, we'll check again later if
retries > 0
Review comment:
Fixed
##########
File path:
streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopicManager.java
##########
@@ -98,9 +98,10 @@ public InternalTopicManager(final Admin adminClient, final
StreamsConfig streams
int remainingRetries = retries;
Set<String> topicsNotReady = new HashSet<>(topics.keySet());
final Set<String> newlyCreatedTopics = new HashSet<>();
+ final HashSet<String> tempUnknownTopics = new HashSet<>();
Review comment:
Fixed. Thanks.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]