[ https://issues.apache.org/jira/browse/KAFKA-14054?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Matthias J. Sax reassigned KAFKA-14054: --------------------------------------- Assignee: Matthias J. Sax > Unexpected client shutdown as TimeoutException is thrown as > IllegalStateException > --------------------------------------------------------------------------------- > > Key: KAFKA-14054 > URL: https://issues.apache.org/jira/browse/KAFKA-14054 > Project: Kafka > Issue Type: Bug > Components: streams > Affects Versions: 3.1.0, 3.2.0, 3.1.1 > Reporter: Donald > Assignee: Matthias J. Sax > Priority: Major > > Re: > https://forum.confluent.io/t/bug-timeoutexception-is-thrown-as-illegalstateexception-causing-client-shutdown/5460/2 > 1) TimeoutException is thrown as IllegalStateException in > {_}org.apache.kafka.streams.processor.internals.StreamTask#commitNeeded{_}. > Which causes the client to shutdown in > {_}org.apache.kafka.streams.KafkaStreams#getActionForThrowable{_}. > 2) Should Timeout be a recoverable error which is expected to be handled by > User? > 3) This issue is exposed by change KAFKA-12887 which was introduced in > kafka-streams ver 3.1.0 > *code referenced* > {code:java|title=org.apache.kafka.streams.processor.internals.StreamTask#commitNeeded} > public boolean commitNeeded() { > if (commitNeeded) { > return true; > } else { > for (final Map.Entry<TopicPartition, Long> entry : > consumedOffsets.entrySet()) { > final TopicPartition partition = entry.getKey(); > try { > final long offset = mainConsumer.position(partition); > if (offset > entry.getValue() + 1) { > commitNeeded = true; > entry.setValue(offset - 1); > } > } catch (final TimeoutException error) { > // the `consumer.position()` call should never block, > because we know that we did process data > // for the requested partition and thus the consumer > should have a valid local position > // that it can return immediately > // hence, a `TimeoutException` indicates a bug and thus > we rethrow it as fatal `IllegalStateException` > throw new IllegalStateException(error); > } catch (final KafkaException fatal) { > throw new StreamsException(fatal); > } > } > return commitNeeded; > } > } > {code} > {code:java|title=org.apache.kafka.streams.KafkaStreams#getActionForThrowable} > private StreamsUncaughtExceptionHandler.StreamThreadExceptionResponse > getActionForThrowable(final Throwable throwable, > > final StreamsUncaughtExceptionHandler > streamsUncaughtExceptionHandler) { > final StreamsUncaughtExceptionHandler.StreamThreadExceptionResponse > action; > if (wrappedExceptionIsIn(throwable, > EXCEPTIONS_NOT_TO_BE_HANDLED_BY_USERS)) { > action = SHUTDOWN_CLIENT; > } else { > action = streamsUncaughtExceptionHandler.handle(throwable); > } > return action; > } > private void handleStreamsUncaughtException(final Throwable throwable, > final > StreamsUncaughtExceptionHandler streamsUncaughtExceptionHandler, > final boolean > skipThreadReplacement) { > final StreamsUncaughtExceptionHandler.StreamThreadExceptionResponse > action = getActionForThrowable(throwable, streamsUncaughtExceptionHandler); > if (oldHandler) { > log.warn("Stream's new uncaught exception handler is set as well > as the deprecated old handler." + > "The old handler will be ignored as long as a new handler > is set."); > } > switch (action) { > case REPLACE_THREAD: > if (!skipThreadReplacement) { > log.error("Replacing thread in the streams uncaught > exception handler", throwable); > replaceStreamThread(throwable); > } else { > log.debug("Skipping thread replacement for recoverable > error"); > } > break; > case SHUTDOWN_CLIENT: > log.error("Encountered the following exception during > processing " + > "and Kafka Streams opted to " + action + "." + > " The streams client is going to shut down now. ", > throwable); > closeToError(); > break; > {code} > *Stacktrace* > {code:java|title=error log kafka-streams v. 3.1.0} > 2022-06-22 13:58:35,796 ERROR > thread=[com_stmartin_hammer_v3_platform-hammer-facade-fdc90fab-ed3a-4e62-b458-e73f80e6975d-StreamThread-1] > logger=o.a.k.s.KafkaStreams - stream-client > [com_stmartin_hammer_v3_platform-hammer-facade-fdc90fab-ed3a-4e62-b458-e73f80e6975d] > Encountered the following exception during processing and Kafka Streams > opted to SHUTDOWN_CLIENT. The streams client is going to shut down now. > org.apache.kafka.streams.errors.StreamsException: > java.lang.IllegalStateException: > org.apache.kafka.common.errors.TimeoutException: Timeout of 60000ms expired > before the position for partition > com_stmartin_hammer_v3_command_pte_hammercommand--demo--compacted-4 could be > determined > at > org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:642) > at > org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:576) > Caused by: java.lang.IllegalStateException: > org.apache.kafka.common.errors.TimeoutException: Timeout of 60000ms expired > before the position for partition > com_stmartin_hammer_v3_command_pte_hammercommand--demo--compacted-4 could be > determined > at > org.apache.kafka.streams.processor.internals.StreamTask.commitNeeded(StreamTask.java:1185) > at > org.apache.kafka.streams.processor.internals.TaskManager.commitTasksAndMaybeUpdateCommittableOffsets(TaskManager.java:1111) > at > org.apache.kafka.streams.processor.internals.TaskManager.commit(TaskManager.java:1084) > at > org.apache.kafka.streams.processor.internals.StreamThread.maybeCommit(StreamThread.java:1071) > at > org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:817) > at > org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:604) > ... 1 common frames omitted > Caused by: org.apache.kafka.common.errors.TimeoutException: Timeout of > 60000ms expired before the position for partition > com_stmartin_hammer_v3_command_pte_hammercommand--demo--compacted-4 could be > determined > 2022-06-22 13:58:35,796 INFO > thread=[com_stmartin_hammer_v3_platform-hammer-facade-fdc90fab-ed3a-4e62-b458-e73f80e6975d-StreamThread-1] > logger=o.a.k.s.KafkaStreams - stream-client > [com_stmartin_hammer_v3_platform-hammer-facade-fdc90fab-ed3a-4e62-b458-e73f80e6975d] > State transition from RUNNING to PENDING_ERROR > {code} -- This message was sent by Atlassian Jira (v8.20.10#820010)