cadonna commented on a change in pull request #11686:
URL: https://github.com/apache/kafka/pull/11686#discussion_r788604115



##########
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/RepartitionTopics.java
##########
@@ -97,49 +101,74 @@ public boolean setup() {
                 }
             }
         }
+    }
 
-        return missingUserInputTopicsPerTopology.isEmpty();
+    public Set<String> topologiesWithMissingInputTopics() {
+        return missingInputTopicsBySubtopology.keySet()
+            .stream()
+            .map(s -> getTopologyNameOrElseUnnamed(s.namedTopology))
+            .collect(Collectors.toSet());
     }
 
-    public Map<String, Set<String>> missingUserInputTopicsPerTopology() {
-        return Collections.unmodifiableMap(missingUserInputTopicsPerTopology);
+    public Queue<StreamsException> missingSourceTopicExceptions() {
+        return missingInputTopicsBySubtopology.entrySet().stream().map(entry 
-> {
+            final Set<String> missingSourceTopics = entry.getValue();
+            final int subtopologyId = entry.getKey().nodeGroupId;
+            final String topologyName = entry.getKey().namedTopology;
+
+            return new StreamsException(
+                new MissingSourceTopicException(String.format(
+                    "Missing source topics %s for subtopology %s of topology 
%s",
+                    missingSourceTopics, subtopologyId, topologyName)),
+                new TaskId(subtopologyId, 0, topologyName));

Review comment:
       My point was that I could not see where you use the task ID in this 
specific case when you the exception is handled and a `StreamsException` can 
also be created without a task ID. So, I did not see the reason to introduce a 
dummy task ID if it is not strictly needed by the exception. I am also fine 
with your current solution.    

##########
File path: 
streams/src/test/java/org/apache/kafka/streams/processor/internals/RepartitionTopicsTest.java
##########
@@ -168,12 +171,16 @@ public void shouldReturnMissingSourceTopics() {
             clusterMetadata,
             "[test] "
         );
+        repartitionTopics.setup();
 
-        assertThat(repartitionTopics.setup(), equalTo(false));
         assertThat(
-            repartitionTopics.missingUserInputTopicsPerTopology(),
-            equalTo(Collections.singletonMap(UNNAMED_TOPOLOGY, 
missingSourceTopics))
+            repartitionTopics.topologiesWithMissingInputTopics(),
+            equalTo(Collections.singleton(UNNAMED_TOPOLOGY))
         );
+        final StreamsException exception = 
repartitionTopics.missingSourceTopicExceptions().poll();
+        assertThat(exception, notNullValue());
+        assertThat(exception.taskId().isPresent(), is(true));
+        assertThat(exception.taskId().get(), equalTo(new TaskId(0, 0)));

Review comment:
       Could you verify that in the absence of missing input topics 
`topologiesWithMissingInputTopics()` and  `missingSourceTopicExceptions()` 
return empty collections?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to