cadonna commented on a change in pull request #11600:
URL: https://github.com/apache/kafka/pull/11600#discussion_r786702599



##########
File path: 
streams/src/test/java/org/apache/kafka/streams/processor/internals/RepartitionTopicsTest.java
##########
@@ -162,14 +169,17 @@ public void shouldThrowMissingSourceTopicException() {
             "[test] "
         );
 
-        assertThrows(MissingSourceTopicException.class, 
repartitionTopics::setup);
+        assertThat(repartitionTopics.setup(), equalTo(false));

Review comment:
       Could you also verify that in the cases without missing source topics 
`setup()` returns `true` in the other unit tests? 

##########
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java
##########
@@ -488,21 +492,32 @@ private boolean checkMetadataVersions(final int 
minReceivedMetadataVersion,
     }
 
     /**
-     * Computes and assembles all repartition topic metadata then creates the 
topics if necessary.
+     * Computes and assembles all repartition topic metadata then creates the 
topics if necessary. Also verifies
+     * that all user input topics of each topology have been created ahead of 
time. If any such source topics are
+     * missing from a NamedTopology, the assignor will skip distributing its 
tasks until they have been created
+     * and invoke the exception handler (without killing the thread) once for 
each topology to alert the user of
+     * the missing topics.
+     * <p>
+     * For regular applications without named topologies, the assignor will 
instead send a shutdown signal to
+     * all clients so the user can identify and resolve the problem.
      *
-     * @return map from repartition topic to its partition info
+     * @return application metadata such as partition info of repartition 
topics, missing external topics, etc
      */
-    private Map<TopicPartition, PartitionInfo> prepareRepartitionTopics(final 
Cluster metadata) {
-
+    private RepartitionTopics prepareRepartitionTopics(final Cluster metadata) 
{
         final RepartitionTopics repartitionTopics = new RepartitionTopics(
             taskManager.topologyMetadata(),
             internalTopicManager,
             copartitionedTopicsEnforcer,
             metadata,
             logPrefix
         );
-        repartitionTopics.setup();
-        return repartitionTopics.topicPartitionsInfo();
+        final boolean isMissingInputTopics = !repartitionTopics.setup();
+        if (isMissingInputTopics) {
+            if (!taskManager.topologyMetadata().hasNamedTopologies()) {
+                throw new MissingSourceTopicException("Missing source 
topics.");
+            }
+        }

Review comment:
       That is just a nit that you might consider in the follow-up PR: I would 
not return a boolean from `setup()` since actually the setup succeeds even if 
the return value is `false`. I would check the return value of 
`RepartitionTopics#missingUserInputTopicsPerTopology()`. You could even move 
all this checks into `RepartitionTopics`. I think, it would make the code a bit 
cleaner.   




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to