ableegoldman commented on a change in pull request #11686:
URL: https://github.com/apache/kafka/pull/11686#discussion_r788430597



##########
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java
##########
@@ -515,11 +518,23 @@ private RepartitionTopics prepareRepartitionTopics(final 
Cluster metadata) {
         if (isMissingInputTopics) {
             if (!taskManager.topologyMetadata().hasNamedTopologies()) {
                 throw new MissingSourceTopicException("Missing source 
topics.");
+            } else {
+                for (final Map.Entry<String, Set<String>> topology : 
repartitionTopics.missingUserInputTopicsPerTopology().entrySet()) {
+                    final String topologyName = topology.getKey();
+                    final StreamsException exception = new StreamsException(
+                        new MissingSourceTopicException(String.format(
+                            "Missing source topics %s for topology %s",
+                            topology.getValue(),
+                            topologyName)),
+                        getDummyTaskIdForTopology(topologyName));

Review comment:
       Ah, well I'm not sure if this is what you meant or not but now that you 
mention it, we should actually be able to associate the missing source topics 
with a specific subtopology -- although it doesn't make sense to do so for a 
specific partition, so the TaskId will still have to have a "dummy" partition
   
   If you were asking why we need a TaskId at all, it's because that's how we 
correlate an exception thrown to the uncaught exception handler with a specific 
named topology -- the StreamsException class now has an (optional) TaskId field




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to