abbccdda commented on a change in pull request #8712: URL: https://github.com/apache/kafka/pull/8712#discussion_r445277549
########## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopicManager.java ########## @@ -179,31 +180,43 @@ public InternalTopicManager(final Admin adminClient, final StreamsConfig streams * Topics that were not able to get its description will simply not be returned */ // visible for testing - protected Map<String, Integer> getNumPartitions(final Set<String> topics) { - log.debug("Trying to check if topics {} have been created with expected number of partitions.", topics); - - final DescribeTopicsResult describeTopicsResult = adminClient.describeTopics(topics); + protected Map<String, Integer> getNumPartitions(final Set<String> topics, + final HashSet<String> tempUnknownTopics, + final int remainingRetries) { Review comment: We could just pass in a boolean here to indicate whether there are remaining retries ########## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopicManager.java ########## @@ -98,9 +98,10 @@ public InternalTopicManager(final Admin adminClient, final StreamsConfig streams int remainingRetries = retries; Set<String> topicsNotReady = new HashSet<>(topics.keySet()); final Set<String> newlyCreatedTopics = new HashSet<>(); + final HashSet<String> tempUnknownTopics = new HashSet<>(); Review comment: s/HashSet/Set? ########## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopicManager.java ########## @@ -243,10 +259,18 @@ public InternalTopicManager(final Admin adminClient, final StreamsConfig streams throw new StreamsException(errorMsg); } } else { - topicsToCreate.add(topicName); + // for the tempUnknownTopics, we'll check again later if retries > 0 Review comment: Could be merged with above `else` ########## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopicManager.java ########## @@ -179,31 +180,43 @@ public InternalTopicManager(final Admin adminClient, final StreamsConfig streams * Topics that were not able to get its description will simply not be returned */ // visible for testing - protected Map<String, Integer> getNumPartitions(final Set<String> topics) { - log.debug("Trying to check if topics {} have been created with expected number of partitions.", topics); - - final DescribeTopicsResult describeTopicsResult = adminClient.describeTopics(topics); + protected Map<String, Integer> getNumPartitions(final Set<String> topics, + final HashSet<String> tempUnknownTopics, + final int remainingRetries) { + final Set<String> allTopicsToDescribe = new HashSet<>(topics); + allTopicsToDescribe.addAll(tempUnknownTopics); Review comment: Why do we need `allTopicsToDescribe`? It seems only queried once locally. ########## File path: streams/src/test/java/org/apache/kafka/streams/processor/internals/InternalTopicManagerTest.java ########## @@ -287,12 +291,41 @@ public void shouldLogWhenTopicNotFoundAndNotThrowException() { assertThat( appender.getMessages(), - hasItem("stream-thread [" + threadName + "] Topic internal-topic is unknown or not found, hence not existed yet:" + - " org.apache.kafka.common.errors.UnknownTopicOrPartitionException: Topic internal-topic not found.") + hasItem("stream-thread [" + threadName + "] Topic internal-topic is unknown or not found, hence not existed yet.\n" + + "Error message was: org.apache.kafka.common.errors.UnknownTopicOrPartitionException: Topic internal-topic not found.") ); } } + @Test + public void shouldLogWhenTopicLeaderNotAvailableAndThrowException() { + final String leaderNotAvailableTopic = "LeaderNotAvailableTopic"; + final AdminClient admin = EasyMock.createNiceMock(AdminClient.class); + final InternalTopicManager topicManager = new InternalTopicManager(admin, new StreamsConfig(config)); + + final KafkaFutureImpl<TopicDescription> topicDescriptionFailFuture = new KafkaFutureImpl<>(); + topicDescriptionFailFuture.completeExceptionally(new LeaderNotAvailableException("Leader Not Available!")); + + // simulate describeTopics got LeaderNotAvailableException + EasyMock.expect(admin.describeTopics(Collections.singleton(leaderNotAvailableTopic))) + .andReturn(new MockDescribeTopicsResult( Review comment: Use 4 space format to align with other tests. ########## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopicManager.java ########## @@ -179,31 +180,43 @@ public InternalTopicManager(final Admin adminClient, final StreamsConfig streams * Topics that were not able to get its description will simply not be returned */ // visible for testing - protected Map<String, Integer> getNumPartitions(final Set<String> topics) { - log.debug("Trying to check if topics {} have been created with expected number of partitions.", topics); - - final DescribeTopicsResult describeTopicsResult = adminClient.describeTopics(topics); + protected Map<String, Integer> getNumPartitions(final Set<String> topics, + final HashSet<String> tempUnknownTopics, + final int remainingRetries) { + final Set<String> allTopicsToDescribe = new HashSet<>(topics); + allTopicsToDescribe.addAll(tempUnknownTopics); + log.debug("Trying to check if topics {} have been created with expected number of partitions.", allTopicsToDescribe); + + final DescribeTopicsResult describeTopicsResult = adminClient.describeTopics(allTopicsToDescribe); final Map<String, KafkaFuture<TopicDescription>> futures = describeTopicsResult.values(); final Map<String, Integer> existedTopicPartition = new HashMap<>(); for (final Map.Entry<String, KafkaFuture<TopicDescription>> topicFuture : futures.entrySet()) { final String topicName = topicFuture.getKey(); try { final TopicDescription topicDescription = topicFuture.getValue().get(); - existedTopicPartition.put( - topicFuture.getKey(), - topicDescription.partitions().size()); + existedTopicPartition.put(topicName, topicDescription.partitions().size()); + tempUnknownTopics.remove(topicName); } catch (final InterruptedException fatalException) { // this should not happen; if it ever happens it indicate a bug Thread.currentThread().interrupt(); log.error(INTERRUPTED_ERROR_MESSAGE, fatalException); throw new IllegalStateException(INTERRUPTED_ERROR_MESSAGE, fatalException); } catch (final ExecutionException couldNotDescribeTopicException) { final Throwable cause = couldNotDescribeTopicException.getCause(); - if (cause instanceof UnknownTopicOrPartitionException || - cause instanceof LeaderNotAvailableException) { - // This topic didn't exist or leader is not known yet, proceed to try to create it - log.debug("Topic {} is unknown or not found, hence not existed yet: {}", topicName, cause.toString()); + if (cause instanceof UnknownTopicOrPartitionException) { + // This topic didn't exist, proceed to try to create it + log.debug("Topic {} is unknown or not found, hence not existed yet.\n" + + "Error message was: {}", topicName, cause.toString()); + } else if (cause instanceof LeaderNotAvailableException) { + tempUnknownTopics.add(topicName); + if (remainingRetries > 0) { Review comment: Could reduce the if-else block as: ``` if (remainingRetries <= 0) { // run out of retries, throw exception directly throw new StreamsException( String.format("The leader of the Topic %s is not available after %d retries.", topicName, retries), cause); } log.debug("The leader of the Topic {} is not available, with {} retries left.\n" + "Error message was: {}", topicName, remainingRetries, cause.toString()); ``` ########## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopicManager.java ########## @@ -218,13 +231,16 @@ public InternalTopicManager(final Admin adminClient, final StreamsConfig streams /** * Check the existing topics to have correct number of partitions; and return the remaining topics that needs to be created */ - private Set<String> validateTopics(final Set<String> topicsToValidate, final Map<String, InternalTopicConfig> topicsMap) { + private Set<String> validateTopics(final Set<String> topicsToValidate, + final Map<String, InternalTopicConfig> topicsMap, + final HashSet<String> tempUnknownTopics, Review comment: Similar here, we could reduce to Set ########## File path: streams/src/test/java/org/apache/kafka/streams/processor/internals/InternalTopicManagerTest.java ########## @@ -108,7 +111,8 @@ public void shouldReturnCorrectPartitionCounts() { topic, Collections.singletonList(new TopicPartitionInfo(0, broker1, singleReplica, Collections.emptyList())), null); - assertEquals(Collections.singletonMap(topic, 1), internalTopicManager.getNumPartitions(Collections.singleton(topic))); + assertEquals(Collections.singletonMap(topic, 1), + internalTopicManager.getNumPartitions(Collections.singleton(topic), new HashSet<String>(), 1)); Review comment: Could use Collections.emptySet() if reduced to Set ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org