cadonna commented on a change in pull request #9697:
URL: https://github.com/apache/kafka/pull/9697#discussion_r540249465



##########
File path: streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
##########
@@ -444,6 +444,25 @@ private void handleStreamsUncaughtException(final 
Throwable throwable,
                     "The old handler will be ignored as long as a new handler 
is set.");
         }
         switch (action) {
+            case REPLACE_THREAD:
+                if (globalStreamThread != null && 
Thread.currentThread().getName().equals(globalStreamThread.getName())) {
+                    log.warn("The global thread cannot be replaced. Reverting 
to shutting down the client.");
+                    log.error("Encountered the following exception during 
processing " +
+                            "and the registered exception handler opted to " + 
action + "." +
+                            " The streams client is going to shut down now. ", 
throwable);
+                    close(Duration.ZERO);
+                }
+                final StreamThread deadThread = (StreamThread) 
Thread.currentThread();
+                threads.remove(deadThread);
+                addStreamThread();
+                deadThread.shutdown();
+                if (throwable instanceof RuntimeException) {
+                    throw (RuntimeException) throwable;
+                } else if (throwable instanceof Error) {
+                    throw (Error) throwable;
+                } else {
+                    throw new RuntimeException("Unexpected checked exception 
caught in the uncaught exception handler", throwable);
+                }

Review comment:
       I think it would be cleaner to extract this code to a separate method.

##########
File path: 
streams/src/test/java/org/apache/kafka/streams/integration/StreamsUncaughtExceptionHandlerIntegrationTest.java
##########
@@ -202,6 +213,29 @@ private void testShutdownApplication(final int numThreads) 
throws InterruptedExc
             assertThat(processorValueCollector.size(), equalTo(1));
         }
     }
+
+    private void testReplaceThreads(final int numThreads) throws 
InterruptedException {
+        properties.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, numThreads);
+        try (final KafkaStreams kafkaStreams = new 
KafkaStreams(builder.build(), properties)) {
+            kafkaStreams.setUncaughtExceptionHandler((t, e) -> fail("should 
not hit old handler"));
+
+            final AtomicInteger count = new AtomicInteger();
+            kafkaStreams.setUncaughtExceptionHandler(exception -> {
+                count.getAndIncrement();
+                if (count.get() > 2) {
+                    return SHUTDOWN_CLIENT;
+                }
+                return REPLACE_THREAD;
+            });
+            
StreamsTestUtils.startKafkaStreamsAndWaitForRunningState(kafkaStreams);
+
+            produceMessages(0L, inputTopic, "A");
+            waitForApplicationState(Collections.singletonList(kafkaStreams), 
KafkaStreams.State.NOT_RUNNING, DEFAULT_DURATION);
+
+            assertThat(processorValueCollector.size(), equalTo(3));
+            //because we only have 2 threads at the start and each record 
kills a thread we must have replaced threads

Review comment:
       Could you please be a bit clearer in the explanatory comment? BTW, we 
execute this test also once with just one stream thread so the 2 stream threads 
in the comment are not correct. Also, wouldn't it be better to explain the 
verification in the call to `assertThat()` instead of in a comment? You can 
pass a reason to the method.

##########
File path: 
streams/src/test/java/org/apache/kafka/streams/integration/StreamsUncaughtExceptionHandlerIntegrationTest.java
##########
@@ -202,6 +213,29 @@ private void testShutdownApplication(final int numThreads) 
throws InterruptedExc
             assertThat(processorValueCollector.size(), equalTo(1));
         }
     }
+
+    private void testReplaceThreads(final int numThreads) throws 
InterruptedException {
+        properties.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, numThreads);
+        try (final KafkaStreams kafkaStreams = new 
KafkaStreams(builder.build(), properties)) {
+            kafkaStreams.setUncaughtExceptionHandler((t, e) -> fail("should 
not hit old handler"));
+
+            final AtomicInteger count = new AtomicInteger();
+            kafkaStreams.setUncaughtExceptionHandler(exception -> {
+                count.getAndIncrement();
+                if (count.get() > 2) {
+                    return SHUTDOWN_CLIENT;
+                }
+                return REPLACE_THREAD;
+            });

Review comment:
       I think it would be better to have a test that shows that a new thread 
that replaced a failed one, actually is able to process records. So, I would 
let the new thread process some records and then shutdown the client with a 
normal close.
   
   Maybe similar applies to the shutdown tests. First let the 
client/application process some records and then throw an exception that shuts 
down the client/application. I guess, this last paragraph is something for a 
separate PR.

##########
File path: streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
##########
@@ -444,6 +444,25 @@ private void handleStreamsUncaughtException(final 
Throwable throwable,
                     "The old handler will be ignored as long as a new handler 
is set.");
         }
         switch (action) {
+            case REPLACE_THREAD:
+                if (globalStreamThread != null && 
Thread.currentThread().getName().equals(globalStreamThread.getName())) {
+                    log.warn("The global thread cannot be replaced. Reverting 
to shutting down the client.");
+                    log.error("Encountered the following exception during 
processing " +
+                            "and the registered exception handler opted to " + 
action + "." +
+                            " The streams client is going to shut down now. ", 
throwable);
+                    close(Duration.ZERO);
+                }
+                final StreamThread deadThread = (StreamThread) 
Thread.currentThread();
+                threads.remove(deadThread);
+                addStreamThread();
+                deadThread.shutdown();

Review comment:
       Do we need to shutdown the dead stream thread? `completeShutDown()` will 
be called anyways. 

##########
File path: 
streams/src/test/java/org/apache/kafka/streams/integration/StreamsUncaughtExceptionHandlerIntegrationTest.java
##########
@@ -202,6 +213,29 @@ private void testShutdownApplication(final int numThreads) 
throws InterruptedExc
             assertThat(processorValueCollector.size(), equalTo(1));
         }
     }
+
+    private void testReplaceThreads(final int numThreads) throws 
InterruptedException {
+        properties.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, numThreads);
+        try (final KafkaStreams kafkaStreams = new 
KafkaStreams(builder.build(), properties)) {
+            kafkaStreams.setUncaughtExceptionHandler((t, e) -> fail("should 
not hit old handler"));
+
+            final AtomicInteger count = new AtomicInteger();
+            kafkaStreams.setUncaughtExceptionHandler(exception -> {
+                count.getAndIncrement();
+                if (count.get() > 2) {
+                    return SHUTDOWN_CLIENT;
+                }
+                return REPLACE_THREAD;
+            });
+            
StreamsTestUtils.startKafkaStreamsAndWaitForRunningState(kafkaStreams);
+
+            produceMessages(0L, inputTopic, "A");
+            waitForApplicationState(Collections.singletonList(kafkaStreams), 
KafkaStreams.State.NOT_RUNNING, DEFAULT_DURATION);
+
+            assertThat(processorValueCollector.size(), equalTo(3));
+            //because we only have 2 threads at the start and each record 
kills a thread we must have replaced threads
+        }
+    }
 }

Review comment:
       A test is missing for a global stream thread that calls the uncaught 
exception handler.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to