cadonna commented on a change in pull request #10317: URL: https://github.com/apache/kafka/pull/10317#discussion_r595843144
########## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopicManager.java ########## @@ -580,4 +562,118 @@ private String getBrokerSideConfigValue(final Config brokerSideTopicConfig, return topicsToCreate; } + + /** + * Sets up internal topics. + * + * Either the given topic are all created or the method fails with an exception. + * + * @param topicConfigs internal topics to setup + */ + public void setup(final Map<String, InternalTopicConfig> topicConfigs) { + log.info("Starting to setup internal topics {}.", topicConfigs.keySet()); + + final long now = time.milliseconds(); + final long deadline = now + retryTimeoutMs; + + final Map<String, Map<String, String>> newTopicConfigs = topicConfigs.values().stream() Review comment: I would not rename this since the additional retention is only added in case of `topicConfig instanceof WindowedChangelogTopicConfig`, which is one of three. What about `streamsSideTopicConfigs` to emphasize that these are the topic configs that Streams sets by default possibly overidden by user code in the Streams app. ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org