guozhangwang commented on a change in pull request #10460:
URL: https://github.com/apache/kafka/pull/10460#discussion_r616386557



##########
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopicManager.java
##########
@@ -374,194 +356,18 @@ private String getBrokerSideConfigValue(final Config 
brokerSideTopicConfig,
         return brokerSideConfigEntry.value();
     }
 
-    /**
-     * Prepares a set of given internal topics.
-     *
-     * If a topic does not exist creates a new topic.
-     * If a topic with the correct number of partitions exists ignores it.
-     * If a topic exists already but has different number of partitions we 
fail and throw exception requesting user to reset the app before restarting 
again.
-     * @return the set of topics which had to be newly created
-     */
-    public Set<String> makeReady(final Map<String, InternalTopicConfig> 
topics) {
-        // we will do the validation / topic-creation in a loop, until we have 
confirmed all topics
-        // have existed with the expected number of partitions, or some create 
topic returns fatal errors.
-        log.debug("Starting to validate internal topics {} in partition 
assignor.", topics);
-
-        long currentWallClockMs = time.milliseconds();
-        final long deadlineMs = currentWallClockMs + retryTimeoutMs;
-
-        Set<String> topicsNotReady = new HashSet<>(topics.keySet());
-        final Set<String> newlyCreatedTopics = new HashSet<>();
-
-        while (!topicsNotReady.isEmpty()) {
-            final Set<String> tempUnknownTopics = new HashSet<>();
-            topicsNotReady = validateTopics(topicsNotReady, topics, 
tempUnknownTopics);
-            newlyCreatedTopics.addAll(topicsNotReady);
-
-            if (!topicsNotReady.isEmpty()) {
-                final Set<NewTopic> newTopics = new HashSet<>();
-
-                for (final String topicName : topicsNotReady) {
-                    if (tempUnknownTopics.contains(topicName)) {
-                        // for the tempUnknownTopics, don't create topic for 
them
-                        // we'll check again later if remaining retries > 0
-                        continue;
-                    }
-                    final InternalTopicConfig internalTopicConfig = 
Objects.requireNonNull(topics.get(topicName));
-                    final Map<String, String> topicConfig = 
internalTopicConfig.getProperties(defaultTopicConfigs, 
windowChangeLogAdditionalRetention);
-
-                    log.debug("Going to create topic {} with {} partitions and 
config {}.",
-                        internalTopicConfig.name(),
-                        internalTopicConfig.numberOfPartitions(),
-                        topicConfig);
-
-                    newTopics.add(
-                        new NewTopic(
-                            internalTopicConfig.name(),
-                            internalTopicConfig.numberOfPartitions(),
-                            Optional.of(replicationFactor))
-                            .configs(topicConfig));
-                }
-
-                final CreateTopicsResult createTopicsResult = 
adminClient.createTopics(newTopics);
-
-                for (final Map.Entry<String, KafkaFuture<Void>> 
createTopicResult : createTopicsResult.values().entrySet()) {
-                    final String topicName = createTopicResult.getKey();
-                    try {
-                        createTopicResult.getValue().get();
-                        topicsNotReady.remove(topicName);
-                    } catch (final InterruptedException fatalException) {
-                        // this should not happen; if it ever happens it 
indicate a bug
-                        Thread.currentThread().interrupt();
-                        log.error(INTERRUPTED_ERROR_MESSAGE, fatalException);

Review comment:
       Is it an intentional decision to no longer log interrupted error any 
more, and also throw RTE instead of ISE?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to