ableegoldman commented on a change in pull request #10387:
URL: https://github.com/apache/kafka/pull/10387#discussion_r602525060



##########
File path: streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
##########
@@ -492,22 +492,18 @@ private void handleStreamsUncaughtException(final 
Throwable throwable,
                 closeToError();
                 break;
             case SHUTDOWN_APPLICATION:
+                if (getNumLiveStreamThreads() <= 1) {

Review comment:
       You can use `StreamsBuilder#addGlobalStore` to inject an error, this 
method has a required parameter for a Processor which you're supposed to use to 
insert the data into the store (tbh no idea why you have to implement this 
yourself, it's caused a LOT of problems since it's unclear what you can/can't 
do with this -- but that's a whole other can of worms). 
   
   Adding a thread is probably a fine workaround for now, it clearly answered 
the question, but imo we should just go ahead and use the 
`StreamsBuilder#addGlobalStore` + injected error to decouple the test from the 
way we currently happen to handle this. Then we won't have to rewrite it if we 
ever change something.
   
   Regarding the IllegalSTateException: wouldn't the thread hit this 
`IllegalStateException` before it got the chance to send the shutdown signal in 
a rebalance? IIRC the `subscribe` that's throwing is at the very beginning of 
the StreamThread loop. And unfortunately I think this may actually result in an 
endless cycle --  the new thread hits an unexpected exception and invokes the 
handler, SHUTDOWN_APPLICATION notices that this is the last StreamThread so it 
starts a new one, and so on.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to