guozhangwang commented on a change in pull request #10460: URL: https://github.com/apache/kafka/pull/10460#discussion_r616386557
########## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopicManager.java ########## @@ -374,194 +356,18 @@ private String getBrokerSideConfigValue(final Config brokerSideTopicConfig, return brokerSideConfigEntry.value(); } - /** - * Prepares a set of given internal topics. - * - * If a topic does not exist creates a new topic. - * If a topic with the correct number of partitions exists ignores it. - * If a topic exists already but has different number of partitions we fail and throw exception requesting user to reset the app before restarting again. - * @return the set of topics which had to be newly created - */ - public Set<String> makeReady(final Map<String, InternalTopicConfig> topics) { - // we will do the validation / topic-creation in a loop, until we have confirmed all topics - // have existed with the expected number of partitions, or some create topic returns fatal errors. - log.debug("Starting to validate internal topics {} in partition assignor.", topics); - - long currentWallClockMs = time.milliseconds(); - final long deadlineMs = currentWallClockMs + retryTimeoutMs; - - Set<String> topicsNotReady = new HashSet<>(topics.keySet()); - final Set<String> newlyCreatedTopics = new HashSet<>(); - - while (!topicsNotReady.isEmpty()) { - final Set<String> tempUnknownTopics = new HashSet<>(); - topicsNotReady = validateTopics(topicsNotReady, topics, tempUnknownTopics); - newlyCreatedTopics.addAll(topicsNotReady); - - if (!topicsNotReady.isEmpty()) { - final Set<NewTopic> newTopics = new HashSet<>(); - - for (final String topicName : topicsNotReady) { - if (tempUnknownTopics.contains(topicName)) { - // for the tempUnknownTopics, don't create topic for them - // we'll check again later if remaining retries > 0 - continue; - } - final InternalTopicConfig internalTopicConfig = Objects.requireNonNull(topics.get(topicName)); - final Map<String, String> topicConfig = internalTopicConfig.getProperties(defaultTopicConfigs, windowChangeLogAdditionalRetention); - - log.debug("Going to create topic {} with {} partitions and config {}.", - internalTopicConfig.name(), - internalTopicConfig.numberOfPartitions(), - topicConfig); - - newTopics.add( - new NewTopic( - internalTopicConfig.name(), - internalTopicConfig.numberOfPartitions(), - Optional.of(replicationFactor)) - .configs(topicConfig)); - } - - final CreateTopicsResult createTopicsResult = adminClient.createTopics(newTopics); - - for (final Map.Entry<String, KafkaFuture<Void>> createTopicResult : createTopicsResult.values().entrySet()) { - final String topicName = createTopicResult.getKey(); - try { - createTopicResult.getValue().get(); - topicsNotReady.remove(topicName); - } catch (final InterruptedException fatalException) { - // this should not happen; if it ever happens it indicate a bug - Thread.currentThread().interrupt(); - log.error(INTERRUPTED_ERROR_MESSAGE, fatalException); Review comment: Is it an intentional decision to no longer log interrupted error any more, and also throw RTE instead of ISE? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org