showuon commented on a change in pull request #8712:
URL: https://github.com/apache/kafka/pull/8712#discussion_r446126644
##########
File path:
streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopicManager.java
##########
@@ -125,35 +131,37 @@ public InternalTopicManager(final Admin adminClient,
final StreamsConfig streams
final CreateTopicsResult createTopicsResult =
adminClient.createTopics(newTopics);
- for (final Map.Entry<String, KafkaFuture<Void>>
createTopicResult : createTopicsResult.values().entrySet()) {
- final String topicName = createTopicResult.getKey();
- try {
- createTopicResult.getValue().get();
- topicsNotReady.remove(topicName);
- } catch (final InterruptedException fatalException) {
- // this should not happen; if it ever happens it
indicate a bug
- Thread.currentThread().interrupt();
- log.error(INTERRUPTED_ERROR_MESSAGE, fatalException);
- throw new
IllegalStateException(INTERRUPTED_ERROR_MESSAGE, fatalException);
- } catch (final ExecutionException executionException) {
- final Throwable cause = executionException.getCause();
- if (cause instanceof TopicExistsException) {
- // This topic didn't exist earlier or its leader
not known before; just retain it for next round of validation.
- log.info("Could not create topic {}. Topic is
probably marked for deletion (number of partitions is unknown).\n" +
- "Will retry to create this topic in {} ms (to
let broker finish async delete operation first).\n" +
- "Error message was: {}", topicName,
retryBackOffMs, cause.toString());
- } else {
- log.error("Unexpected error during topic creation
for {}.\n" +
- "Error message was: {}", topicName,
cause.toString());
- throw new StreamsException(String.format("Could
not create topic %s.", topicName), cause);
+ if (createTopicsResult != null) {
+ for (final Map.Entry<String, KafkaFuture<Void>>
createTopicResult : createTopicsResult.values().entrySet()) {
Review comment:
Need the null check for `createTopicsResult` since the `newTopics` might
be empty
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]