ableegoldman commented on a change in pull request #11600:
URL: https://github.com/apache/kafka/pull/11600#discussion_r786448672



##########
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java
##########
@@ -488,23 +492,37 @@ private boolean checkMetadataVersions(final int 
minReceivedMetadataVersion,
     }
 
     /**
-     * Computes and assembles all repartition topic metadata then creates the 
topics if necessary.
+     * Computes and assembles all repartition topic metadata then creates the 
topics if necessary. Also verifies
+     * that all user input topics of each topology have been created ahead of 
time. If any such source topics are
+     * missing from a NamedTopology, the assignor will skip distributing its 
tasks until they have been created
+     * and invoke the exception handler (without killing the thread) once for 
each topology to alert the user of
+     * the missing topics.
+     * <p>
+     * For regular applications without named topologies, the assignor will 
instead send a shutdown signal to
+     * all clients so the user can identify and resolve the problem.
      *
-     * @return map from repartition topic to its partition info
+     * @return application metadata such as partition info of repartition 
topics, missing external topics, etc
      */
-    private Map<TopicPartition, PartitionInfo> prepareRepartitionTopics(final 
Cluster metadata) {
-
+    private RepartitionTopics prepareRepartitionTopics(final Cluster metadata) 
{
         final RepartitionTopics repartitionTopics = new RepartitionTopics(
             taskManager.topologyMetadata(),
             internalTopicManager,
             copartitionedTopicsEnforcer,
             metadata,
             logPrefix
         );
-        repartitionTopics.setup();
-        return repartitionTopics.topicPartitionsInfo();
+        final boolean isMissingInputTopics = !repartitionTopics.setup();
+        if (isMissingInputTopics) {
+            if (!taskManager.topologyMetadata().hasNamedTopologies()) {
+                throw new MissingSourceTopicException("Missing source 
topics.");
+            } else {
+
+            }

Review comment:
       I was working on debugging a test before pushing the changes to invoke 
the uncaught exception handler, I think I'm just going to split that out into a 
separate PR after all. I'll clean this up for this PR




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to