ableegoldman commented on a change in pull request #11686:
URL: https://github.com/apache/kafka/pull/11686#discussion_r789243445



##########
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/RepartitionTopics.java
##########
@@ -97,49 +101,74 @@ public boolean setup() {
                 }
             }
         }
+    }
 
-        return missingUserInputTopicsPerTopology.isEmpty();
+    public Set<String> topologiesWithMissingInputTopics() {
+        return missingInputTopicsBySubtopology.keySet()
+            .stream()
+            .map(s -> getTopologyNameOrElseUnnamed(s.namedTopology))
+            .collect(Collectors.toSet());
     }
 
-    public Map<String, Set<String>> missingUserInputTopicsPerTopology() {
-        return Collections.unmodifiableMap(missingUserInputTopicsPerTopology);
+    public Queue<StreamsException> missingSourceTopicExceptions() {
+        return missingInputTopicsBySubtopology.entrySet().stream().map(entry 
-> {
+            final Set<String> missingSourceTopics = entry.getValue();
+            final int subtopologyId = entry.getKey().nodeGroupId;
+            final String topologyName = entry.getKey().namedTopology;
+
+            return new StreamsException(
+                new MissingSourceTopicException(String.format(
+                    "Missing source topics %s for subtopology %s of topology 
%s",
+                    missingSourceTopics, subtopologyId, topologyName)),
+                new TaskId(subtopologyId, 0, topologyName));

Review comment:
       We want to add a TaskId because we want to provide information about 
which topology is missing the source topics, and to a lesser extent which 
subtopology. If we create the StreamsException without a TaskId then it's 
considerably more difficult to figure out which topology (or which query) is 
experiencing the issue -- technically you could parse the error message in the 
MissingSourceTopicException but that's obviously very brittle and just not a 
good idea. Hence, we provide the `topologyName` through a "dummy" TaskId




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to