cadonna commented on a change in pull request #11686:
URL: https://github.com/apache/kafka/pull/11686#discussion_r788604115



##########
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/RepartitionTopics.java
##########
@@ -97,49 +101,74 @@ public boolean setup() {
                 }
             }
         }
+    }
 
-        return missingUserInputTopicsPerTopology.isEmpty();
+    public Set<String> topologiesWithMissingInputTopics() {
+        return missingInputTopicsBySubtopology.keySet()
+            .stream()
+            .map(s -> getTopologyNameOrElseUnnamed(s.namedTopology))
+            .collect(Collectors.toSet());
     }
 
-    public Map<String, Set<String>> missingUserInputTopicsPerTopology() {
-        return Collections.unmodifiableMap(missingUserInputTopicsPerTopology);
+    public Queue<StreamsException> missingSourceTopicExceptions() {
+        return missingInputTopicsBySubtopology.entrySet().stream().map(entry 
-> {
+            final Set<String> missingSourceTopics = entry.getValue();
+            final int subtopologyId = entry.getKey().nodeGroupId;
+            final String topologyName = entry.getKey().namedTopology;
+
+            return new StreamsException(
+                new MissingSourceTopicException(String.format(
+                    "Missing source topics %s for subtopology %s of topology 
%s",
+                    missingSourceTopics, subtopologyId, topologyName)),
+                new TaskId(subtopologyId, 0, topologyName));

Review comment:
       My point was that I could not see where you use the task ID in this 
specific case when you handle the exception and a `StreamsException` can also 
be created without a task ID. So, I did not see the reason to introduce a dummy 
task ID if it is not strictly needed by the exception. I am also fine with your 
current solution.    




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to