wcarlson5 commented on a change in pull request #9273:
URL: https://github.com/apache/kafka/pull/9273#discussion_r497081251



##########
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
##########
@@ -570,6 +581,49 @@ void runLoop() {
         }
     }
 
+    public void setStreamsUncaughtExceptionHandler(final 
StreamsUncaughtExceptionHandler streamsUncaughtExceptionHandler, final 
KafkaStreams kafkaStreams) {
+        this.streamsUncaughtExceptionHandler = streamsUncaughtExceptionHandler;
+        this.kafkaStreams = kafkaStreams;
+    }
+
+    private void handleStreamsUncaughtException(final Exception e) {
+        final 
StreamsUncaughtExceptionHandler.StreamsUncaughtExceptionHandlerResponse action 
= this.streamsUncaughtExceptionHandler.handle(e);
+        if (kafkaStreams == null) {
+            log.error("Encountered the following exception during processing " 
+
+                    "and the thread is going to shut down: ", e);
+            return;
+        }
+        switch (action) {
+            case SHUTDOWN_STREAM_THREAD:
+                log.error("Encountered the following exception during 
processing " +
+                        "and the thread is going to shut down: ", e);
+                break;
+            case REPLACE_STREAM_THREAD:
+                log.error("Encountered the following exception during 
processing " +
+                        "and the the stream thread will be replaced: ", e);
+                break;
+            case SHUTDOWN_KAFKA_STREAMS_CLIENT:
+                log.error("Encountered the following exception during 
processing " +
+                        "and the client is going to shut down: ", e);
+                kafkaStreams.close(Duration.ZERO);

Review comment:
       currently leaves client state in not running instead of error. 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to