This is an automated email from the ASF dual-hosted git repository.
guozhang pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 4835c64 KAFKA-12887 Skip some RuntimeExceptions from exception
handler (#11228)
4835c64 is described below
commit 4835c64f89876db5af2bba8fa3ef17de5a0b44e3
Author: Josep Prat <[email protected]>
AuthorDate: Wed Sep 1 18:58:36 2021 +0200
KAFKA-12887 Skip some RuntimeExceptions from exception handler (#11228)
Instead of letting all RuntimeExceptions go through and be processed by the
uncaught exception handler, IllegalStateException and IllegalArgumentException
are not passed through and fail fast. In this PR when setting the uncaught
exception handler we check if the exception is in an "exclude list", if so, we
terminate the client, otherwise we continue as usual.
Added test checking this new case. Added integration test checking that
user defined exception handler is not used when an IllegalStateException is
thrown.
Reviewers: Bruno Cadonna <[email protected]>, Guozhang Wang
<[email protected]>
---
.../org/apache/kafka/streams/KafkaStreams.java | 22 +++++++-
.../integration/EmitOnChangeIntegrationTest.java | 2 +-
...amsUncaughtExceptionHandlerIntegrationTest.java | 61 +++++++++++++++++++---
.../processor/internals/StreamThreadTest.java | 4 +-
4 files changed, 78 insertions(+), 11 deletions(-)
diff --git a/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
b/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
index eee55fc..5067da6 100644
--- a/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
+++ b/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
@@ -141,6 +141,9 @@ public class KafkaStreams implements AutoCloseable {
private static final String JMX_PREFIX = "kafka.streams";
+ private static final Set<Class<? extends Throwable>>
EXCEPTIONS_NOT_TO_BE_HANDLED_BY_USERS =
+ new HashSet<>(Arrays.asList(IllegalStateException.class,
IllegalArgumentException.class));
+
// processId is expected to be unique across JVMs and to be used
// in userData of the subscription request to allow assignor be aware
// of the co-location of stream thread's consumers. It is for internal
@@ -495,9 +498,24 @@ public class KafkaStreams implements AutoCloseable {
}
}
+ private boolean wrappedExceptionIsIn(final Throwable throwable, final
Set<Class<? extends Throwable>> exceptionsOfInterest) {
+ return throwable.getCause() != null &&
exceptionsOfInterest.contains(throwable.getCause().getClass());
+ }
+
+ private StreamsUncaughtExceptionHandler.StreamThreadExceptionResponse
getActionForThrowable(final Throwable throwable,
+
final StreamsUncaughtExceptionHandler
streamsUncaughtExceptionHandler) {
+ final StreamsUncaughtExceptionHandler.StreamThreadExceptionResponse
action;
+ if (wrappedExceptionIsIn(throwable,
EXCEPTIONS_NOT_TO_BE_HANDLED_BY_USERS)) {
+ action = SHUTDOWN_CLIENT;
+ } else {
+ action = streamsUncaughtExceptionHandler.handle(throwable);
+ }
+ return action;
+ }
+
private void handleStreamsUncaughtException(final Throwable throwable,
final
StreamsUncaughtExceptionHandler streamsUncaughtExceptionHandler) {
- final StreamsUncaughtExceptionHandler.StreamThreadExceptionResponse
action = streamsUncaughtExceptionHandler.handle(throwable);
+ final StreamsUncaughtExceptionHandler.StreamThreadExceptionResponse
action = getActionForThrowable(throwable, streamsUncaughtExceptionHandler);
if (oldHandler) {
log.warn("Stream's new uncaught exception handler is set as well
as the deprecated old handler." +
"The old handler will be ignored as long as a new handler
is set.");
@@ -509,7 +527,7 @@ public class KafkaStreams implements AutoCloseable {
break;
case SHUTDOWN_CLIENT:
log.error("Encountered the following exception during
processing " +
- "and the registered exception handler opted to " +
action + "." +
+ "and Kafka Streams opted to " + action + "." +
" The streams client is going to shut down now. ",
throwable);
closeToError();
break;
diff --git
a/streams/src/test/java/org/apache/kafka/streams/integration/EmitOnChangeIntegrationTest.java
b/streams/src/test/java/org/apache/kafka/streams/integration/EmitOnChangeIntegrationTest.java
index 8bc5e52..63e0f27 100644
---
a/streams/src/test/java/org/apache/kafka/streams/integration/EmitOnChangeIntegrationTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/integration/EmitOnChangeIntegrationTest.java
@@ -104,7 +104,7 @@ public class EmitOnChangeIntegrationTest {
.toStream()
.map((key, value) -> {
if (shouldThrow.compareAndSet(true, false)) {
- throw new IllegalStateException("Kaboom");
+ throw new RuntimeException("Kaboom");
} else {
return new KeyValue<>(key, value);
}
diff --git
a/streams/src/test/java/org/apache/kafka/streams/integration/StreamsUncaughtExceptionHandlerIntegrationTest.java
b/streams/src/test/java/org/apache/kafka/streams/integration/StreamsUncaughtExceptionHandlerIntegrationTest.java
index 97d894f..74d6ba9 100644
---
a/streams/src/test/java/org/apache/kafka/streams/integration/StreamsUncaughtExceptionHandlerIntegrationTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/integration/StreamsUncaughtExceptionHandlerIntegrationTest.java
@@ -93,7 +93,9 @@ public class StreamsUncaughtExceptionHandlerIntegrationTest {
private static Properties properties;
private static List<String> processorValueCollector;
private static String appId = "";
- private static AtomicBoolean throwError = new AtomicBoolean(true);
+ private static final AtomicBoolean THROW_ERROR = new AtomicBoolean(true);
+ private static final AtomicBoolean THROW_ILLEGAL_STATE_EXCEPTION = new
AtomicBoolean(false);
+ private static final AtomicBoolean THROW_ILLEGAL_ARGUMENT_EXCEPTION = new
AtomicBoolean(false);
@Before
public void setup() {
@@ -163,6 +165,47 @@ public class
StreamsUncaughtExceptionHandlerIntegrationTest {
}
}
+
+ @Test
+ public void shouldShutdownClientWhenIllegalStateException() throws
InterruptedException {
+ THROW_ILLEGAL_STATE_EXCEPTION.compareAndSet(false, true);
+ try (final KafkaStreams kafkaStreams = new
KafkaStreams(builder.build(), properties)) {
+ kafkaStreams.setUncaughtExceptionHandler((t, e) -> fail("should
not hit old handler"));
+
+ kafkaStreams.setUncaughtExceptionHandler(exception ->
REPLACE_THREAD); // if the user defined uncaught exception handler would be hit
we would be replacing the thread
+
+
StreamsTestUtils.startKafkaStreamsAndWaitForRunningState(kafkaStreams);
+
+ produceMessages(0L, inputTopic, "A");
+ waitForApplicationState(Collections.singletonList(kafkaStreams),
KafkaStreams.State.ERROR, DEFAULT_DURATION);
+
+ assertThat(processorValueCollector.size(), equalTo(1));
+ } finally {
+ THROW_ILLEGAL_STATE_EXCEPTION.compareAndSet(true, false);
+ }
+
+ }
+
+ @Test
+ public void shouldShutdownClientWhenIllegalArgumentException() throws
InterruptedException {
+ THROW_ILLEGAL_ARGUMENT_EXCEPTION.compareAndSet(false, true);
+ try (final KafkaStreams kafkaStreams = new
KafkaStreams(builder.build(), properties)) {
+ kafkaStreams.setUncaughtExceptionHandler((t, e) -> fail("should
not hit old handler"));
+
+ kafkaStreams.setUncaughtExceptionHandler(exception ->
REPLACE_THREAD); // if the user defined uncaught exception handler would be hit
we would be replacing the thread
+
+
StreamsTestUtils.startKafkaStreamsAndWaitForRunningState(kafkaStreams);
+
+ produceMessages(0L, inputTopic, "A");
+ waitForApplicationState(Collections.singletonList(kafkaStreams),
KafkaStreams.State.ERROR, DEFAULT_DURATION);
+
+ assertThat(processorValueCollector.size(), equalTo(1));
+ } finally {
+ THROW_ILLEGAL_ARGUMENT_EXCEPTION.compareAndSet(true, false);
+ }
+
+ }
+
@Test
public void shouldReplaceThreads() throws InterruptedException {
testReplaceThreads(2);
@@ -235,10 +278,16 @@ public class
StreamsUncaughtExceptionHandlerIntegrationTest {
@Override
public void process(final String key, final String value) {
valueList.add(value + " " + context.taskId());
- if (throwError.get()) {
- throw new StreamsException(Thread.currentThread().getName());
+ if (THROW_ERROR.get()) {
+ if (THROW_ILLEGAL_STATE_EXCEPTION.get()) {
+ throw new IllegalStateException("Something unexpected
happened in " + Thread.currentThread().getName());
+ } else if (THROW_ILLEGAL_ARGUMENT_EXCEPTION.get()) {
+ throw new IllegalArgumentException("Something unexpected
happened in " + Thread.currentThread().getName());
+ } else {
+ throw new
StreamsException(Thread.currentThread().getName());
+ }
}
- throwError.set(true);
+ THROW_ERROR.set(true);
}
}
@@ -272,7 +321,7 @@ public class StreamsUncaughtExceptionHandlerIntegrationTest
{
final AtomicInteger count = new AtomicInteger();
kafkaStreams.setUncaughtExceptionHandler(exception -> {
if (count.incrementAndGet() == numThreads) {
- throwError.set(false);
+ THROW_ERROR.set(false);
}
return REPLACE_THREAD;
});
@@ -280,7 +329,7 @@ public class StreamsUncaughtExceptionHandlerIntegrationTest
{
produceMessages(0L, inputTopic, "A");
TestUtils.waitForCondition(() -> count.get() == numThreads,
"finished replacing threads");
- TestUtils.waitForCondition(() -> throwError.get(), "finished
replacing threads");
+ TestUtils.waitForCondition(() -> THROW_ERROR.get(), "finished
replacing threads");
kafkaStreams.close();
waitForApplicationState(Collections.singletonList(kafkaStreams),
KafkaStreams.State.NOT_RUNNING, DEFAULT_DURATION);
diff --git
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
index 02b20e2..e57c565 100644
---
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
@@ -2336,9 +2336,9 @@ public class StreamThreadTest {
expect(consumer.groupMetadata()).andStubReturn(consumerGroupMetadata);
expect(consumerGroupMetadata.groupInstanceId()).andReturn(Optional.empty());
consumer.subscribe((Collection<String>) anyObject(), anyObject());
- EasyMock.expectLastCall().anyTimes();
+ EasyMock.expectLastCall().atLeastOnce();
consumer.unsubscribe();
- EasyMock.expectLastCall().anyTimes();
+ EasyMock.expectLastCall().atLeastOnce();
EasyMock.replay(consumerGroupMetadata);
final Task task1 = mock(Task.class);
final Task task2 = mock(Task.class);