chickenchickenlove commented on code in PR #15573:
URL: https://github.com/apache/kafka/pull/15573#discussion_r1542822737


##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java:
##########
@@ -523,7 +523,17 @@ private RepartitionTopics prepareRepartitionTopics(final 
Cluster metadata) {
         final boolean isMissingInputTopics = 
!repartitionTopics.missingSourceTopicExceptions().isEmpty();
         if (isMissingInputTopics) {
             if (!taskManager.topologyMetadata().hasNamedTopologies()) {
-                throw new MissingSourceTopicException("Missing source 
topics.");

Review Comment:
   <img width="948" alt="image" 
src="https://github.com/apache/kafka/assets/90125071/c00ef63e-c20b-4edf-9795-7b8b2b314975";>
   I draw image above to describe `idea for improve`. 
   If you want to read `MissingSourceTopics` in StreamRebalanceListener as 
well, this workaround seems like it could be a way to handle it.
   
   1. As you know, `ThreadLocal` provides storage specific to each thread.
   2. Both `StreamThread`, `StreamRebalanceListener`, `StreamPartitionAssignor` 
are included on internal package. that means, it is not public API.
   3. `StreamRebalanceListener` has reference of `StreamThread` Already. 
   
   
   
https://github.com/apache/kafka/blob/4ccbf1634afb063615616b5995ef279a063fbeab/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java#L622
   From this,  I believe one `StreamThread` has own `StreamRebalanceListener` 
instance. thsu, `ThreadLocal` is suitable workaround in this case, i believe. 
   
   Thus, all the things that we should do, are 3step.
   1. Add `ThreadLocal` to field of `StreamThread`.
   2. Add method that put `missing source topics` to `ThreadLocal` before throw 
`MissingSourceTopicExceptions`, 
   3. Add some codes on `StreamRebalanceListener` to get `MissingSourceTopics` 
from `ThreadLocal`.
   
   Does it make sense to you? 
   
   
   
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to