cadonna commented on a change in pull request #11686:
URL: https://github.com/apache/kafka/pull/11686#discussion_r788563612
##########
File path: streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
##########
@@ -523,16 +528,21 @@ private boolean wrappedExceptionIsIn(final Throwable
throwable, final Set<Class<
}
private void handleStreamsUncaughtException(final Throwable throwable,
- final
StreamsUncaughtExceptionHandler streamsUncaughtExceptionHandler) {
+ final
StreamsUncaughtExceptionHandler streamsUncaughtExceptionHandler,
+ final boolean
skipThreadReplacement) {
final StreamsUncaughtExceptionHandler.StreamThreadExceptionResponse
action = getActionForThrowable(throwable, streamsUncaughtExceptionHandler);
if (oldHandler) {
log.warn("Stream's new uncaught exception handler is set as well
as the deprecated old handler." +
"The old handler will be ignored as long as a new handler
is set.");
}
switch (action) {
case REPLACE_THREAD:
- log.error("Replacing thread in the streams uncaught exception
handler", throwable);
- replaceStreamThread(throwable);
+ if (!skipThreadReplacement) {
+ log.error("Replacing thread in the streams uncaught
exception handler", throwable);
+ replaceStreamThread(throwable);
+ } else {
+ log.debug("Skipping thread replacement for recoverable
error");
Review comment:
I agree that `REPLACE_THREAD` leaks quite some internals. `CONTINUE`
would be a better term. Actually, it doesn't matter what we do internally to
ensure continuing the processing.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]