mjsax commented on code in PR #15573:
URL: https://github.com/apache/kafka/pull/15573#discussion_r1542738987


##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java:
##########
@@ -523,7 +523,17 @@ private RepartitionTopics prepareRepartitionTopics(final 
Cluster metadata) {
         final boolean isMissingInputTopics = 
!repartitionTopics.missingSourceTopicExceptions().isEmpty();
         if (isMissingInputTopics) {
             if (!taskManager.topologyMetadata().hasNamedTopologies()) {
-                throw new MissingSourceTopicException("Missing source 
topics.");

Review Comment:
   Was Bruno meant was, that we need to add `log.error(...)` to log the error 
message before throwing the exception. Seems you did not add this yet?



##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java:
##########
@@ -523,7 +523,17 @@ private RepartitionTopics prepareRepartitionTopics(final 
Cluster metadata) {
         final boolean isMissingInputTopics = 
!repartitionTopics.missingSourceTopicExceptions().isEmpty();
         if (isMissingInputTopics) {
             if (!taskManager.topologyMetadata().hasNamedTopologies()) {
-                throw new MissingSourceTopicException("Missing source 
topics.");

Review Comment:
   Adding to a previous comment from Bruno, I am wondering if we should also 
change the error log in `StreamsRebalanceListener` to point out that the 
missing source topic names might be logged on a different instance?
   
   The `StreamsRebalanceListener` is executed on every instance, but 
`StreamsPartitionAssignor` only on the group leader.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to