ableegoldman commented on a change in pull request #11600: URL: https://github.com/apache/kafka/pull/11600#discussion_r786445650
########## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/RepartitionTopics.java ########## @@ -85,24 +86,51 @@ public void setup() { ); } } + + return missingExternalSourceTopicsPerTopology; } public Map<TopicPartition, PartitionInfo> topicPartitionsInfo() { return Collections.unmodifiableMap(topicPartitionInfos); } - private Map<String, InternalTopicConfig> computeRepartitionTopicConfig(final Map<Subtopology, TopicsInfo> topicGroups, - final Cluster clusterMetadata) { - - final Map<String, InternalTopicConfig> repartitionTopicConfigs = new HashMap<>(); - for (final TopicsInfo topicsInfo : topicGroups.values()) { - checkIfExternalSourceTopicsExist(topicsInfo, clusterMetadata); - repartitionTopicConfigs.putAll(topicsInfo.repartitionSourceTopics.values().stream() - .collect(Collectors.toMap(InternalTopicConfig::name, topicConfig -> topicConfig))); + /** + * @param topicGroups information about the topic groups (subtopologies) in this application + * @param clusterMetadata cluster metadata, eg which topics exist on the brokers + * @param topologiesWithMissingSourceTopics set of missing user input topics, to be filled in by this method + */ + private Map<String, InternalTopicConfig> computeRepartitionTopicConfig(final Map<String, Collection<TopicsInfo>> topicGroups, + final Cluster clusterMetadata, + final Map<String, Set<String>> topologiesWithMissingSourceTopics) { + final Set<TopicsInfo> allTopicsInfo = new HashSet<>(); + final Map<String, InternalTopicConfig> allRepartitionTopicConfigs = new HashMap<>(); + for (final Map.Entry<String, Collection<TopicsInfo>> topology : topicGroups.entrySet()) { + final String topologyName = topology.getKey(); + final Set<String> missingSourceTopicsPerTopology = new HashSet<>(); + final Map<String, InternalTopicConfig> repartitionTopicConfigsPerTopology = new HashMap<>(); + for (final TopicsInfo topicsInfo : topology.getValue()) { + missingSourceTopicsPerTopology.addAll(computeMissingExternalSourceTopics(topicsInfo, clusterMetadata)); + repartitionTopicConfigsPerTopology.putAll( + topicsInfo.repartitionSourceTopics + .values() + .stream() + .collect(Collectors.toMap(InternalTopicConfig::name, topicConfig -> topicConfig))); + } + if (missingSourceTopicsPerTopology.isEmpty()) { + allRepartitionTopicConfigs.putAll(repartitionTopicConfigsPerTopology); + allTopicsInfo.addAll(topology.getValue()); + } else { + topologiesWithMissingSourceTopics.put(topologyName, missingSourceTopicsPerTopology); + log.error("Topology {} was missing source topics {} and will be excluded from the current assignment, " + + "this can be due to the consumer client's metadata being stale or because they have " + + "not been created yet. Please verify that you have created all input topics. When the " + + "metadata is updated a new rebalance will be kicked off automatically and the topology " + + "will retried at that time.", topologyName, missingSourceTopicsPerTopology); Review comment: That's what I was trying to get across, but I can try to be more clear -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org