Donald created KAFKA-14054:
------------------------------
Summary: Unexpected client shutdown as TimeoutException is thrown
as IllegalStateException
Key: KAFKA-14054
URL: https://issues.apache.org/jira/browse/KAFKA-14054
Project: Kafka
Issue Type: Bug
Components: streams
Affects Versions: 3.1.1, 3.2.0, 3.1.0
Reporter: Donald
Re:
https://forum.confluent.io/t/bug-timeoutexception-is-thrown-as-illegalstateexception-causing-client-shutdown/5460/2
1) TimeoutException is thrown as IllegalStateException in
{_}org.apache.kafka.streams.processor.internals.StreamTask#commitNeeded{_}.
Which causes the client to shutdown in
{_}org.apache.kafka.streams.KafkaStreams#getActionForThrowable{_}.
2) Should Timeout be a recoverable error which is expected to be handled by
User?
3) This issue is exposed by change KAFKA-12887 which was introduced in
kafka-streams ver 3.1.0
*code referenced*
{code:java|title=org.apache.kafka.streams.processor.internals.StreamTask#commitNeeded}
public boolean commitNeeded() {
if (commitNeeded) {
return true;
} else {
for (final Map.Entry<TopicPartition, Long> entry :
consumedOffsets.entrySet()) {
final TopicPartition partition = entry.getKey();
try {
final long offset = mainConsumer.position(partition);
if (offset > entry.getValue() + 1) {
commitNeeded = true;
entry.setValue(offset - 1);
}
} catch (final TimeoutException error) {
// the `consumer.position()` call should never block,
because we know that we did process data
// for the requested partition and thus the consumer should
have a valid local position
// that it can return immediately
// hence, a `TimeoutException` indicates a bug and thus we
rethrow it as fatal `IllegalStateException`
throw new IllegalStateException(error);
} catch (final KafkaException fatal) {
throw new StreamsException(fatal);
}
}
return commitNeeded;
}
}
{code}
{code:java|title=org.apache.kafka.streams.KafkaStreams#getActionForThrowable}
private StreamsUncaughtExceptionHandler.StreamThreadExceptionResponse
getActionForThrowable(final Throwable throwable,
final StreamsUncaughtExceptionHandler
streamsUncaughtExceptionHandler) {
final StreamsUncaughtExceptionHandler.StreamThreadExceptionResponse
action;
if (wrappedExceptionIsIn(throwable,
EXCEPTIONS_NOT_TO_BE_HANDLED_BY_USERS)) {
action = SHUTDOWN_CLIENT;
} else {
action = streamsUncaughtExceptionHandler.handle(throwable);
}
return action;
}
private void handleStreamsUncaughtException(final Throwable throwable,
final
StreamsUncaughtExceptionHandler streamsUncaughtExceptionHandler,
final boolean
skipThreadReplacement) {
final StreamsUncaughtExceptionHandler.StreamThreadExceptionResponse
action = getActionForThrowable(throwable, streamsUncaughtExceptionHandler);
if (oldHandler) {
log.warn("Stream's new uncaught exception handler is set as well as
the deprecated old handler." +
"The old handler will be ignored as long as a new handler
is set.");
}
switch (action) {
case REPLACE_THREAD:
if (!skipThreadReplacement) {
log.error("Replacing thread in the streams uncaught
exception handler", throwable);
replaceStreamThread(throwable);
} else {
log.debug("Skipping thread replacement for recoverable
error");
}
break;
case SHUTDOWN_CLIENT:
log.error("Encountered the following exception during
processing " +
"and Kafka Streams opted to " + action + "." +
" The streams client is going to shut down now. ",
throwable);
closeToError();
break;
{code}
*Stacktrace*
{code:java|title=error log kafka-streams v. 3.1.0}
2022-06-22 13:58:35,796 ERROR
thread=[com_stmartin_hammer_v3_platform-hammer-facade-fdc90fab-ed3a-4e62-b458-e73f80e6975d-StreamThread-1]
logger=o.a.k.s.KafkaStreams - stream-client
[com_stmartin_hammer_v3_platform-hammer-facade-fdc90fab-ed3a-4e62-b458-e73f80e6975d]
Encountered the following exception during processing and Kafka Streams opted
to SHUTDOWN_CLIENT. The streams client is going to shut down now.
org.apache.kafka.streams.errors.StreamsException:
java.lang.IllegalStateException:
org.apache.kafka.common.errors.TimeoutException: Timeout of 60000ms expired
before the position for partition
com_stmartin_hammer_v3_command_pte_hammercommand--demo--compacted-4 could be
determined
at
org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:642)
at
org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:576)
Caused by: java.lang.IllegalStateException:
org.apache.kafka.common.errors.TimeoutException: Timeout of 60000ms expired
before the position for partition
com_stmartin_hammer_v3_command_pte_hammercommand--demo--compacted-4 could be
determined
at
org.apache.kafka.streams.processor.internals.StreamTask.commitNeeded(StreamTask.java:1185)
at
org.apache.kafka.streams.processor.internals.TaskManager.commitTasksAndMaybeUpdateCommittableOffsets(TaskManager.java:1111)
at
org.apache.kafka.streams.processor.internals.TaskManager.commit(TaskManager.java:1084)
at
org.apache.kafka.streams.processor.internals.StreamThread.maybeCommit(StreamThread.java:1071)
at
org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:817)
at
org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:604)
... 1 common frames omitted
Caused by: org.apache.kafka.common.errors.TimeoutException: Timeout of 60000ms
expired before the position for partition
com_stmartin_hammer_v3_command_pte_hammercommand--demo--compacted-4 could be
determined
2022-06-22 13:58:35,796 INFO
thread=[com_stmartin_hammer_v3_platform-hammer-facade-fdc90fab-ed3a-4e62-b458-e73f80e6975d-StreamThread-1]
logger=o.a.k.s.KafkaStreams - stream-client
[com_stmartin_hammer_v3_platform-hammer-facade-fdc90fab-ed3a-4e62-b458-e73f80e6975d]
State transition from RUNNING to PENDING_ERROR
{code}
--
This message was sent by Atlassian Jira
(v8.20.10#820010)