ableegoldman commented on a change in pull request #11600: URL: https://github.com/apache/kafka/pull/11600#discussion_r786448672
########## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java ########## @@ -488,23 +492,37 @@ private boolean checkMetadataVersions(final int minReceivedMetadataVersion, } /** - * Computes and assembles all repartition topic metadata then creates the topics if necessary. + * Computes and assembles all repartition topic metadata then creates the topics if necessary. Also verifies + * that all user input topics of each topology have been created ahead of time. If any such source topics are + * missing from a NamedTopology, the assignor will skip distributing its tasks until they have been created + * and invoke the exception handler (without killing the thread) once for each topology to alert the user of + * the missing topics. + * <p> + * For regular applications without named topologies, the assignor will instead send a shutdown signal to + * all clients so the user can identify and resolve the problem. * - * @return map from repartition topic to its partition info + * @return application metadata such as partition info of repartition topics, missing external topics, etc */ - private Map<TopicPartition, PartitionInfo> prepareRepartitionTopics(final Cluster metadata) { - + private RepartitionTopics prepareRepartitionTopics(final Cluster metadata) { final RepartitionTopics repartitionTopics = new RepartitionTopics( taskManager.topologyMetadata(), internalTopicManager, copartitionedTopicsEnforcer, metadata, logPrefix ); - repartitionTopics.setup(); - return repartitionTopics.topicPartitionsInfo(); + final boolean isMissingInputTopics = !repartitionTopics.setup(); + if (isMissingInputTopics) { + if (!taskManager.topologyMetadata().hasNamedTopologies()) { + throw new MissingSourceTopicException("Missing source topics."); + } else { + + } Review comment: I was working on debugging a test before pushing the changes to invoke the uncaught exception handler, I think I'm just going to split that out into a separate PR after all. I'll clean this up for this PR -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org