abbccdda commented on a change in pull request #8712:
URL: https://github.com/apache/kafka/pull/8712#discussion_r445277549



##########
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopicManager.java
##########
@@ -179,31 +180,43 @@ public InternalTopicManager(final Admin adminClient, 
final StreamsConfig streams
      * Topics that were not able to get its description will simply not be 
returned
      */
     // visible for testing
-    protected Map<String, Integer> getNumPartitions(final Set<String> topics) {
-        log.debug("Trying to check if topics {} have been created with 
expected number of partitions.", topics);
-
-        final DescribeTopicsResult describeTopicsResult = 
adminClient.describeTopics(topics);
+    protected Map<String, Integer> getNumPartitions(final Set<String> topics,
+                                                    final HashSet<String> 
tempUnknownTopics,
+                                                    final int 
remainingRetries) {

Review comment:
       We could just pass in a boolean here to indicate whether there are 
remaining retries

##########
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopicManager.java
##########
@@ -98,9 +98,10 @@ public InternalTopicManager(final Admin adminClient, final 
StreamsConfig streams
         int remainingRetries = retries;
         Set<String> topicsNotReady = new HashSet<>(topics.keySet());
         final Set<String> newlyCreatedTopics = new HashSet<>();
+        final HashSet<String> tempUnknownTopics = new HashSet<>();

Review comment:
       s/HashSet/Set?

##########
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopicManager.java
##########
@@ -243,10 +259,18 @@ public InternalTopicManager(final Admin adminClient, 
final StreamsConfig streams
                     throw new StreamsException(errorMsg);
                 }
             } else {
-                topicsToCreate.add(topicName);
+                // for the tempUnknownTopics, we'll check again later if 
retries > 0

Review comment:
       Could be merged with above `else`

##########
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopicManager.java
##########
@@ -179,31 +180,43 @@ public InternalTopicManager(final Admin adminClient, 
final StreamsConfig streams
      * Topics that were not able to get its description will simply not be 
returned
      */
     // visible for testing
-    protected Map<String, Integer> getNumPartitions(final Set<String> topics) {
-        log.debug("Trying to check if topics {} have been created with 
expected number of partitions.", topics);
-
-        final DescribeTopicsResult describeTopicsResult = 
adminClient.describeTopics(topics);
+    protected Map<String, Integer> getNumPartitions(final Set<String> topics,
+                                                    final HashSet<String> 
tempUnknownTopics,
+                                                    final int 
remainingRetries) {
+        final Set<String> allTopicsToDescribe = new HashSet<>(topics);
+        allTopicsToDescribe.addAll(tempUnknownTopics);

Review comment:
       Why do we need `allTopicsToDescribe`? It seems only queried once locally.

##########
File path: 
streams/src/test/java/org/apache/kafka/streams/processor/internals/InternalTopicManagerTest.java
##########
@@ -287,12 +291,41 @@ public void 
shouldLogWhenTopicNotFoundAndNotThrowException() {
 
             assertThat(
                 appender.getMessages(),
-                hasItem("stream-thread [" + threadName + "] Topic 
internal-topic is unknown or not found, hence not existed yet:" +
-                    " 
org.apache.kafka.common.errors.UnknownTopicOrPartitionException: Topic 
internal-topic not found.")
+                hasItem("stream-thread [" + threadName + "] Topic 
internal-topic is unknown or not found, hence not existed yet.\n" +
+                    "Error message was: 
org.apache.kafka.common.errors.UnknownTopicOrPartitionException: Topic 
internal-topic not found.")
             );
         }
     }
 
+    @Test
+    public void shouldLogWhenTopicLeaderNotAvailableAndThrowException() {
+        final String leaderNotAvailableTopic = "LeaderNotAvailableTopic";
+        final AdminClient admin = EasyMock.createNiceMock(AdminClient.class);
+        final InternalTopicManager topicManager = new 
InternalTopicManager(admin, new StreamsConfig(config));
+
+        final KafkaFutureImpl<TopicDescription> topicDescriptionFailFuture = 
new KafkaFutureImpl<>();
+        topicDescriptionFailFuture.completeExceptionally(new 
LeaderNotAvailableException("Leader Not Available!"));
+
+        // simulate describeTopics got LeaderNotAvailableException
+        
EasyMock.expect(admin.describeTopics(Collections.singleton(leaderNotAvailableTopic)))
+                .andReturn(new MockDescribeTopicsResult(

Review comment:
       Use 4 space format to align with other tests.

##########
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopicManager.java
##########
@@ -179,31 +180,43 @@ public InternalTopicManager(final Admin adminClient, 
final StreamsConfig streams
      * Topics that were not able to get its description will simply not be 
returned
      */
     // visible for testing
-    protected Map<String, Integer> getNumPartitions(final Set<String> topics) {
-        log.debug("Trying to check if topics {} have been created with 
expected number of partitions.", topics);
-
-        final DescribeTopicsResult describeTopicsResult = 
adminClient.describeTopics(topics);
+    protected Map<String, Integer> getNumPartitions(final Set<String> topics,
+                                                    final HashSet<String> 
tempUnknownTopics,
+                                                    final int 
remainingRetries) {
+        final Set<String> allTopicsToDescribe = new HashSet<>(topics);
+        allTopicsToDescribe.addAll(tempUnknownTopics);
+        log.debug("Trying to check if topics {} have been created with 
expected number of partitions.", allTopicsToDescribe);
+
+        final DescribeTopicsResult describeTopicsResult = 
adminClient.describeTopics(allTopicsToDescribe);
         final Map<String, KafkaFuture<TopicDescription>> futures = 
describeTopicsResult.values();
 
         final Map<String, Integer> existedTopicPartition = new HashMap<>();
         for (final Map.Entry<String, KafkaFuture<TopicDescription>> 
topicFuture : futures.entrySet()) {
             final String topicName = topicFuture.getKey();
             try {
                 final TopicDescription topicDescription = 
topicFuture.getValue().get();
-                existedTopicPartition.put(
-                    topicFuture.getKey(),
-                    topicDescription.partitions().size());
+                existedTopicPartition.put(topicName, 
topicDescription.partitions().size());
+                tempUnknownTopics.remove(topicName);
             } catch (final InterruptedException fatalException) {
                 // this should not happen; if it ever happens it indicate a bug
                 Thread.currentThread().interrupt();
                 log.error(INTERRUPTED_ERROR_MESSAGE, fatalException);
                 throw new IllegalStateException(INTERRUPTED_ERROR_MESSAGE, 
fatalException);
             } catch (final ExecutionException couldNotDescribeTopicException) {
                 final Throwable cause = 
couldNotDescribeTopicException.getCause();
-                if (cause instanceof UnknownTopicOrPartitionException ||
-                    cause instanceof LeaderNotAvailableException) {
-                    // This topic didn't exist or leader is not known yet, 
proceed to try to create it
-                    log.debug("Topic {} is unknown or not found, hence not 
existed yet: {}", topicName, cause.toString());
+                if (cause instanceof UnknownTopicOrPartitionException) {
+                    // This topic didn't exist, proceed to try to create it
+                    log.debug("Topic {} is unknown or not found, hence not 
existed yet.\n" +
+                        "Error message was: {}", topicName, cause.toString());
+                } else if (cause instanceof LeaderNotAvailableException) {
+                    tempUnknownTopics.add(topicName);
+                    if (remainingRetries > 0) {

Review comment:
       Could reduce the if-else block as:
   ```
   if (remainingRetries <= 0) {
     // run out of retries, throw exception directly 
     throw new StreamsException(
       String.format("The leader of the Topic %s is not available after %d 
retries.", topicName, retries), cause);
    }
    log.debug("The leader of the Topic {} is not available, with {} retries 
left.\n" +
      "Error message was: {}", topicName, remainingRetries, cause.toString());
   
   ```

##########
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopicManager.java
##########
@@ -218,13 +231,16 @@ public InternalTopicManager(final Admin adminClient, 
final StreamsConfig streams
     /**
      * Check the existing topics to have correct number of partitions; and 
return the remaining topics that needs to be created
      */
-    private Set<String> validateTopics(final Set<String> topicsToValidate, 
final Map<String, InternalTopicConfig> topicsMap) {
+    private Set<String> validateTopics(final Set<String> topicsToValidate,
+                                       final Map<String, InternalTopicConfig> 
topicsMap,
+                                       final HashSet<String> tempUnknownTopics,

Review comment:
       Similar here, we could reduce to Set

##########
File path: 
streams/src/test/java/org/apache/kafka/streams/processor/internals/InternalTopicManagerTest.java
##########
@@ -108,7 +111,8 @@ public void shouldReturnCorrectPartitionCounts() {
             topic,
             Collections.singletonList(new TopicPartitionInfo(0, broker1, 
singleReplica, Collections.emptyList())),
             null);
-        assertEquals(Collections.singletonMap(topic, 1), 
internalTopicManager.getNumPartitions(Collections.singleton(topic)));
+        assertEquals(Collections.singletonMap(topic, 1),
+                
internalTopicManager.getNumPartitions(Collections.singleton(topic), new 
HashSet<String>(), 1));

Review comment:
       Could use Collections.emptySet() if reduced to Set




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to