cadonna commented on a change in pull request #10317:
URL: https://github.com/apache/kafka/pull/10317#discussion_r595843144



##########
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopicManager.java
##########
@@ -580,4 +562,118 @@ private String getBrokerSideConfigValue(final Config 
brokerSideTopicConfig,
 
         return topicsToCreate;
     }
+
+    /**
+     * Sets up internal topics.
+     *
+     * Either the given topic are all created or the method fails with an 
exception.
+     *
+     * @param topicConfigs internal topics to setup
+     */
+    public void setup(final Map<String, InternalTopicConfig> topicConfigs) {
+        log.info("Starting to setup internal topics {}.", 
topicConfigs.keySet());
+
+        final long now = time.milliseconds();
+        final long deadline = now + retryTimeoutMs;
+
+        final Map<String, Map<String, String>> newTopicConfigs = 
topicConfigs.values().stream()

Review comment:
       I would not rename this since the additional retention is only added in 
case of `topicConfig instanceof WindowedChangelogTopicConfig`, which is one of 
three. What about `streamsSideTopicConfigs` to emphasize that these are the 
topic configs that Streams sets by default possibly overidden by user code in 
the Streams app. 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to