Donald created KAFKA-14054:
------------------------------

             Summary: Unexpected client shutdown as TimeoutException is thrown 
as IllegalStateException
                 Key: KAFKA-14054
                 URL: https://issues.apache.org/jira/browse/KAFKA-14054
             Project: Kafka
          Issue Type: Bug
          Components: streams
    Affects Versions: 3.1.1, 3.2.0, 3.1.0
            Reporter: Donald


 Re: 
https://forum.confluent.io/t/bug-timeoutexception-is-thrown-as-illegalstateexception-causing-client-shutdown/5460/2

1) TimeoutException is thrown as IllegalStateException in 
{_}org.apache.kafka.streams.processor.internals.StreamTask#commitNeeded{_}. 
Which causes the client to shutdown in 
{_}org.apache.kafka.streams.KafkaStreams#getActionForThrowable{_}.

2) Should Timeout be a recoverable error which is expected to be handled by 
User?

3) This issue is exposed by change KAFKA-12887 which was introduced in 
kafka-streams ver 3.1.0

*code referenced*
{code:java|title=org.apache.kafka.streams.processor.internals.StreamTask#commitNeeded}
public boolean commitNeeded() {
        if (commitNeeded) {
            return true;
        } else {
            for (final Map.Entry<TopicPartition, Long> entry : 
consumedOffsets.entrySet()) {
                final TopicPartition partition = entry.getKey();
                try {
                    final long offset = mainConsumer.position(partition);
                    if (offset > entry.getValue() + 1) {
                        commitNeeded = true;
                        entry.setValue(offset - 1);
                    }
                } catch (final TimeoutException error) {
                    // the `consumer.position()` call should never block, 
because we know that we did process data
                    // for the requested partition and thus the consumer should 
have a valid local position
                    // that it can return immediately

                    // hence, a `TimeoutException` indicates a bug and thus we 
rethrow it as fatal `IllegalStateException`
                    throw new IllegalStateException(error);
                } catch (final KafkaException fatal) {
                    throw new StreamsException(fatal);
                }
            }

            return commitNeeded;
        }
    }
{code}


{code:java|title=org.apache.kafka.streams.KafkaStreams#getActionForThrowable}
private StreamsUncaughtExceptionHandler.StreamThreadExceptionResponse 
getActionForThrowable(final Throwable throwable,
                                                                                
                final StreamsUncaughtExceptionHandler 
streamsUncaughtExceptionHandler) {
        final StreamsUncaughtExceptionHandler.StreamThreadExceptionResponse 
action;
        if (wrappedExceptionIsIn(throwable, 
EXCEPTIONS_NOT_TO_BE_HANDLED_BY_USERS)) {
            action = SHUTDOWN_CLIENT;
        } else {
            action = streamsUncaughtExceptionHandler.handle(throwable);
        }
        return action;
    }

    private void handleStreamsUncaughtException(final Throwable throwable,
                                                final 
StreamsUncaughtExceptionHandler streamsUncaughtExceptionHandler,
                                                final boolean 
skipThreadReplacement) {
        final StreamsUncaughtExceptionHandler.StreamThreadExceptionResponse 
action = getActionForThrowable(throwable, streamsUncaughtExceptionHandler);
        if (oldHandler) {
            log.warn("Stream's new uncaught exception handler is set as well as 
the deprecated old handler." +
                    "The old handler will be ignored as long as a new handler 
is set.");
        }
        switch (action) {
            case REPLACE_THREAD:
                if (!skipThreadReplacement) {
                    log.error("Replacing thread in the streams uncaught 
exception handler", throwable);
                    replaceStreamThread(throwable);
                } else {
                    log.debug("Skipping thread replacement for recoverable 
error");
                }
                break;
            case SHUTDOWN_CLIENT:
                log.error("Encountered the following exception during 
processing " +
                        "and Kafka Streams opted to " + action + "." +
                        " The streams client is going to shut down now. ", 
throwable);
                closeToError();
                break;
{code}

 *Stacktrace*

{code:java|title=error log kafka-streams v. 3.1.0}
2022-06-22 13:58:35,796 ERROR 
thread=[com_stmartin_hammer_v3_platform-hammer-facade-fdc90fab-ed3a-4e62-b458-e73f80e6975d-StreamThread-1]
 logger=o.a.k.s.KafkaStreams - stream-client 
[com_stmartin_hammer_v3_platform-hammer-facade-fdc90fab-ed3a-4e62-b458-e73f80e6975d]
 Encountered the following exception during processing and Kafka Streams opted 
to SHUTDOWN_CLIENT. The streams client is going to shut down now.
org.apache.kafka.streams.errors.StreamsException: 
java.lang.IllegalStateException: 
org.apache.kafka.common.errors.TimeoutException: Timeout of 60000ms expired 
before the position for partition 
com_stmartin_hammer_v3_command_pte_hammercommand--demo--compacted-4 could be 
determined
        at 
org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:642)
        at 
org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:576)
Caused by: java.lang.IllegalStateException: 
org.apache.kafka.common.errors.TimeoutException: Timeout of 60000ms expired 
before the position for partition 
com_stmartin_hammer_v3_command_pte_hammercommand--demo--compacted-4 could be 
determined
        at 
org.apache.kafka.streams.processor.internals.StreamTask.commitNeeded(StreamTask.java:1185)
        at 
org.apache.kafka.streams.processor.internals.TaskManager.commitTasksAndMaybeUpdateCommittableOffsets(TaskManager.java:1111)
        at 
org.apache.kafka.streams.processor.internals.TaskManager.commit(TaskManager.java:1084)
        at 
org.apache.kafka.streams.processor.internals.StreamThread.maybeCommit(StreamThread.java:1071)
        at 
org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:817)
        at 
org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:604)
        ... 1 common frames omitted
Caused by: org.apache.kafka.common.errors.TimeoutException: Timeout of 60000ms 
expired before the position for partition 
com_stmartin_hammer_v3_command_pte_hammercommand--demo--compacted-4 could be 
determined
2022-06-22 13:58:35,796  INFO 
thread=[com_stmartin_hammer_v3_platform-hammer-facade-fdc90fab-ed3a-4e62-b458-e73f80e6975d-StreamThread-1]
 logger=o.a.k.s.KafkaStreams - stream-client 
[com_stmartin_hammer_v3_platform-hammer-facade-fdc90fab-ed3a-4e62-b458-e73f80e6975d]
 State transition from RUNNING to PENDING_ERROR
{code}




--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to