wcarlson5 commented on a change in pull request #9273:
URL: https://github.com/apache/kafka/pull/9273#discussion_r497080111



##########
File path: 
streams/src/main/java/org/apache/kafka/streams/errors/StreamsUncaughtExceptionHandler.java
##########
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.errors;
+
+public interface StreamsUncaughtExceptionHandler {
+    /**
+     * Inspect a record and the exception received.
+     * @param exception the actual exception
+     */
+    StreamsUncaughtExceptionHandler.StreamsUncaughtExceptionHandlerResponse 
handle(final Exception exception);
+
+    /**
+     * Enumeration that describes the response from the exception handler.
+     */
+    enum StreamsUncaughtExceptionHandlerResponse {
+
+
+        SHUTDOWN_STREAM_THREAD(0, "SHUTDOWN_STREAM_THREAD"),
+        REPLACE_STREAM_THREAD(1, "REPLACE_STREAM_THREAD"),

Review comment:
       This may need to be removed until 663 get merged

##########
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
##########
@@ -570,6 +581,49 @@ void runLoop() {
         }
     }
 
+    public void setStreamsUncaughtExceptionHandler(final 
StreamsUncaughtExceptionHandler streamsUncaughtExceptionHandler, final 
KafkaStreams kafkaStreams) {
+        this.streamsUncaughtExceptionHandler = streamsUncaughtExceptionHandler;
+        this.kafkaStreams = kafkaStreams;
+    }
+
+    private void handleStreamsUncaughtException(final Exception e) {
+        final 
StreamsUncaughtExceptionHandler.StreamsUncaughtExceptionHandlerResponse action 
= this.streamsUncaughtExceptionHandler.handle(e);
+        if (kafkaStreams == null) {
+            log.error("Encountered the following exception during processing " 
+
+                    "and the thread is going to shut down: ", e);
+            return;
+        }
+        switch (action) {
+            case SHUTDOWN_STREAM_THREAD:
+                log.error("Encountered the following exception during 
processing " +
+                        "and the thread is going to shut down: ", e);
+                break;
+            case REPLACE_STREAM_THREAD:

Review comment:
       dependent on 663

##########
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsRebalanceListener.java
##########
@@ -54,6 +55,9 @@ public void onPartitionsAssigned(final 
Collection<TopicPartition> partitions) {
         if (assignmentErrorCode.get() == 
AssignorError.INCOMPLETE_SOURCE_TOPIC_METADATA.code()) {
             log.error("Received error code {}", assignmentErrorCode.get());
             throw new MissingSourceTopicException("One or more source topics 
were missing during rebalance");
+        } else if (assignmentErrorCode.get() == 
AssignorError.SHUTDOWN_REQUESTED.code()) {
+            //throw new ShutdownRequestedException("onPartition assigned"); 
//TODO: receive request and call requestClose()
+            streamThread.kafkaStreams.close(Duration.ZERO);

Review comment:
       blocking, need to find a non blocking call

##########
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
##########
@@ -529,8 +541,7 @@ public void run() {
                 }
             }
 
-            log.error("Encountered the following exception during processing " 
+
-                "and the thread is going to shut down: ", e);
+            handleStreamsUncaughtException(e);

Review comment:
       need to verify that new handler is set

##########
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
##########
@@ -570,6 +581,49 @@ void runLoop() {
         }
     }
 
+    public void setStreamsUncaughtExceptionHandler(final 
StreamsUncaughtExceptionHandler streamsUncaughtExceptionHandler, final 
KafkaStreams kafkaStreams) {

Review comment:
       I am looking into using a wrapper and call back so we do not have to 
introduce a KafkaStreams object here but until I figure that out this will be 
stop gap.
   
   though this will not work in the rebalance listener as it needs the 
reference as well. maybe this is the only way idk yet. @cadonna WDYT?

##########
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
##########
@@ -570,6 +581,49 @@ void runLoop() {
         }
     }
 
+    public void setStreamsUncaughtExceptionHandler(final 
StreamsUncaughtExceptionHandler streamsUncaughtExceptionHandler, final 
KafkaStreams kafkaStreams) {
+        this.streamsUncaughtExceptionHandler = streamsUncaughtExceptionHandler;
+        this.kafkaStreams = kafkaStreams;
+    }
+
+    private void handleStreamsUncaughtException(final Exception e) {
+        final 
StreamsUncaughtExceptionHandler.StreamsUncaughtExceptionHandlerResponse action 
= this.streamsUncaughtExceptionHandler.handle(e);
+        if (kafkaStreams == null) {
+            log.error("Encountered the following exception during processing " 
+
+                    "and the thread is going to shut down: ", e);
+            return;
+        }
+        switch (action) {
+            case SHUTDOWN_STREAM_THREAD:
+                log.error("Encountered the following exception during 
processing " +
+                        "and the thread is going to shut down: ", e);
+                break;
+            case REPLACE_STREAM_THREAD:
+                log.error("Encountered the following exception during 
processing " +
+                        "and the the stream thread will be replaced: ", e);
+                break;
+            case SHUTDOWN_KAFKA_STREAMS_CLIENT:
+                log.error("Encountered the following exception during 
processing " +
+                        "and the client is going to shut down: ", e);
+                kafkaStreams.close(Duration.ZERO);

Review comment:
       currently leaves client state in not running instead of error. We need 
to make a close that leave in error and in non blocking

##########
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
##########
@@ -529,8 +541,7 @@ public void run() {
                 }
             }
 
-            log.error("Encountered the following exception during processing " 
+
-                "and the thread is going to shut down: ", e);
+            handleStreamsUncaughtException(e);
             throw e;

Review comment:
       this will trigger the old uncaught exception handler

##########
File path: streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
##########
@@ -364,6 +369,31 @@ public void setUncaughtExceptionHandler(final 
Thread.UncaughtExceptionHandler eh
         }
     }
 
+    /**
+     * Set the handler invoked when a {@link 
StreamsConfig#NUM_STREAM_THREADS_CONFIG internal thread} abruptly
+     * terminates due to an uncaught exception.
+     *
+     * @param eh the uncaught exception handler of type {@link 
StreamsUncaughtExceptionHandler} for all internal threads; {@code null} deletes 
the current handler
+     * @throws IllegalStateException if this {@code KafkaStreams} instance is 
not in state {@link State#CREATED CREATED}.
+     */
+    public void setUncaughtExceptionHandler(final 
StreamsUncaughtExceptionHandler eh) {

Review comment:
       when we set a new handler should we reset the old one so they can't both 
be triggered?
   Same for the other way around




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to