ableegoldman commented on a change in pull request #10387: URL: https://github.com/apache/kafka/pull/10387#discussion_r602525060
########## File path: streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java ########## @@ -492,22 +492,18 @@ private void handleStreamsUncaughtException(final Throwable throwable, closeToError(); break; case SHUTDOWN_APPLICATION: + if (getNumLiveStreamThreads() <= 1) { Review comment: You can use `StreamsBuilder#addGlobalStore` to inject an error, this method has a required parameter for a Processor which you're supposed to use to insert the data into the store (tbh no idea why you have to implement this yourself, it's caused a LOT of problems since it's unclear what you can/can't do with this -- but that's a whole other can of worms). Adding a thread is probably a fine workaround for now, it clearly answered the question, but imo we should just go ahead and use the `StreamsBuilder#addGlobalStore` + injected error to decouple the test from the way we currently happen to handle this. Then we won't have to rewrite it if we ever change something. Regarding the IllegalSTateException: wouldn't the thread hit this `IllegalStateException` before it got the chance to send the shutdown signal in a rebalance? IIRC the `subscribe` that's throwing is at the very beginning of the StreamThread loop. Also, I take it your test is passing but I wonder why we don't get stuck in an endless cycle? The new thread hits an unexpected exception and invokes the handler, SHUTDOWN_APPLICATION notices that this is the last StreamThread so it starts a new one, and so on. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org