ableegoldman commented on a change in pull request #11686: URL: https://github.com/apache/kafka/pull/11686#discussion_r788430597
########## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java ########## @@ -515,11 +518,23 @@ private RepartitionTopics prepareRepartitionTopics(final Cluster metadata) { if (isMissingInputTopics) { if (!taskManager.topologyMetadata().hasNamedTopologies()) { throw new MissingSourceTopicException("Missing source topics."); + } else { + for (final Map.Entry<String, Set<String>> topology : repartitionTopics.missingUserInputTopicsPerTopology().entrySet()) { + final String topologyName = topology.getKey(); + final StreamsException exception = new StreamsException( + new MissingSourceTopicException(String.format( + "Missing source topics %s for topology %s", + topology.getValue(), + topologyName)), + getDummyTaskIdForTopology(topologyName)); Review comment: Ah, well I'm not sure if this is what you meant or not but now that you mention it, we should actually be able to associate the missing source topics with a specific subtopology -- although it doesn't make sense to do so for a specific partition, so the TaskId will still have to have a "dummy" partition If you were asking why we need a TaskId at all, it's because that's how we correlate an exception thrown to the uncaught exception handler with a specific named topology -- the StreamsException class now has an (optional) TaskId field -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org