ableegoldman commented on a change in pull request #11600:
URL: https://github.com/apache/kafka/pull/11600#discussion_r786445650



##########
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/RepartitionTopics.java
##########
@@ -85,24 +86,51 @@ public void setup() {
                 );
             }
         }
+
+        return missingExternalSourceTopicsPerTopology;
     }
 
     public Map<TopicPartition, PartitionInfo> topicPartitionsInfo() {
         return Collections.unmodifiableMap(topicPartitionInfos);
     }
 
-    private Map<String, InternalTopicConfig> 
computeRepartitionTopicConfig(final Map<Subtopology, TopicsInfo> topicGroups,
-                                                                           
final Cluster clusterMetadata) {
-
-        final Map<String, InternalTopicConfig> repartitionTopicConfigs = new 
HashMap<>();
-        for (final TopicsInfo topicsInfo : topicGroups.values()) {
-            checkIfExternalSourceTopicsExist(topicsInfo, clusterMetadata);
-            
repartitionTopicConfigs.putAll(topicsInfo.repartitionSourceTopics.values().stream()
-                .collect(Collectors.toMap(InternalTopicConfig::name, 
topicConfig -> topicConfig)));
+    /**
+     * @param topicGroups                            information about the 
topic groups (subtopologies) in this application
+     * @param clusterMetadata                        cluster metadata, eg 
which topics exist on the brokers
+     * @param topologiesWithMissingSourceTopics  set of missing user input 
topics, to be filled in by this method
+     */
+    private Map<String, InternalTopicConfig> 
computeRepartitionTopicConfig(final Map<String, Collection<TopicsInfo>> 
topicGroups,
+                                                                           
final Cluster clusterMetadata,
+                                                                           
final Map<String, Set<String>> topologiesWithMissingSourceTopics) {
+        final Set<TopicsInfo> allTopicsInfo = new HashSet<>();
+        final Map<String, InternalTopicConfig> allRepartitionTopicConfigs = 
new HashMap<>();
+        for (final Map.Entry<String, Collection<TopicsInfo>> topology : 
topicGroups.entrySet()) {
+            final String topologyName = topology.getKey();
+            final Set<String> missingSourceTopicsPerTopology = new HashSet<>();
+            final Map<String, InternalTopicConfig> 
repartitionTopicConfigsPerTopology = new HashMap<>();
+            for (final TopicsInfo topicsInfo : topology.getValue()) {
+                
missingSourceTopicsPerTopology.addAll(computeMissingExternalSourceTopics(topicsInfo,
 clusterMetadata));
+                repartitionTopicConfigsPerTopology.putAll(
+                    topicsInfo.repartitionSourceTopics
+                        .values()
+                        .stream()
+                        .collect(Collectors.toMap(InternalTopicConfig::name, 
topicConfig -> topicConfig)));
+            }
+            if (missingSourceTopicsPerTopology.isEmpty()) {
+                
allRepartitionTopicConfigs.putAll(repartitionTopicConfigsPerTopology);
+                allTopicsInfo.addAll(topology.getValue());
+            } else {
+                topologiesWithMissingSourceTopics.put(topologyName, 
missingSourceTopicsPerTopology);
+                log.error("Topology {} was missing source topics {} and will 
be excluded from the current assignment, "
+                              + "this can be due to the consumer client's 
metadata being stale or because they have "
+                              + "not been created yet. Please verify that you 
have created all input topics. When the "
+                              + "metadata is updated a new rebalance will be 
kicked off automatically and the topology "
+                              + "will retried at that time.", topologyName, 
missingSourceTopicsPerTopology);

Review comment:
       That's what I was trying to get across, but I can try to be more clear




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to