cadonna commented on a change in pull request #11686:
URL: https://github.com/apache/kafka/pull/11686#discussion_r788604115
##########
File path:
streams/src/main/java/org/apache/kafka/streams/processor/internals/RepartitionTopics.java
##########
@@ -97,49 +101,74 @@ public boolean setup() {
}
}
}
+ }
- return missingUserInputTopicsPerTopology.isEmpty();
+ public Set<String> topologiesWithMissingInputTopics() {
+ return missingInputTopicsBySubtopology.keySet()
+ .stream()
+ .map(s -> getTopologyNameOrElseUnnamed(s.namedTopology))
+ .collect(Collectors.toSet());
}
- public Map<String, Set<String>> missingUserInputTopicsPerTopology() {
- return Collections.unmodifiableMap(missingUserInputTopicsPerTopology);
+ public Queue<StreamsException> missingSourceTopicExceptions() {
+ return missingInputTopicsBySubtopology.entrySet().stream().map(entry
-> {
+ final Set<String> missingSourceTopics = entry.getValue();
+ final int subtopologyId = entry.getKey().nodeGroupId;
+ final String topologyName = entry.getKey().namedTopology;
+
+ return new StreamsException(
+ new MissingSourceTopicException(String.format(
+ "Missing source topics %s for subtopology %s of topology
%s",
+ missingSourceTopics, subtopologyId, topologyName)),
+ new TaskId(subtopologyId, 0, topologyName));
Review comment:
My point was that I could not see where you use the task ID in this
specific case when you handle the exception and a `StreamsException` can also
be created without a task ID. So, I did not see the reason to introduce a dummy
task ID if it is not strictly needed by the exception. I am also fine with your
current solution.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]