wcarlson5 commented on a change in pull request #9273: URL: https://github.com/apache/kafka/pull/9273#discussion_r497081251
########## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java ########## @@ -570,6 +581,49 @@ void runLoop() { } } + public void setStreamsUncaughtExceptionHandler(final StreamsUncaughtExceptionHandler streamsUncaughtExceptionHandler, final KafkaStreams kafkaStreams) { + this.streamsUncaughtExceptionHandler = streamsUncaughtExceptionHandler; + this.kafkaStreams = kafkaStreams; + } + + private void handleStreamsUncaughtException(final Exception e) { + final StreamsUncaughtExceptionHandler.StreamsUncaughtExceptionHandlerResponse action = this.streamsUncaughtExceptionHandler.handle(e); + if (kafkaStreams == null) { + log.error("Encountered the following exception during processing " + + "and the thread is going to shut down: ", e); + return; + } + switch (action) { + case SHUTDOWN_STREAM_THREAD: + log.error("Encountered the following exception during processing " + + "and the thread is going to shut down: ", e); + break; + case REPLACE_STREAM_THREAD: + log.error("Encountered the following exception during processing " + + "and the the stream thread will be replaced: ", e); + break; + case SHUTDOWN_KAFKA_STREAMS_CLIENT: + log.error("Encountered the following exception during processing " + + "and the client is going to shut down: ", e); + kafkaStreams.close(Duration.ZERO); Review comment: currently leaves client state in not running instead of error. ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org