[ https://issues.apache.org/jira/browse/KAFKA-14128?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Lucia Cerchie reassigned KAFKA-14128: ------------------------------------- Assignee: Lucia Cerchie > Kafka Streams terminates on topic check > --------------------------------------- > > Key: KAFKA-14128 > URL: https://issues.apache.org/jira/browse/KAFKA-14128 > Project: Kafka > Issue Type: Bug > Components: streams > Affects Versions: 3.0.0 > Environment: Any > Reporter: Patrik Kleindl > Assignee: Lucia Cerchie > Priority: Major > > Our streams application shut down unexpectedly after some network issues that > should have been easily recoverable. > Logs: > > {code:java} > 2022-07-29 13:39:37.854 INFO 25843 --- [348aefeff-admin] > org.apache.kafka.clients.NetworkClient : [AdminClient > clientId=L.DII.A-b1355e4a-b909-4da1-a832-dd3348aefeff-admin] Disconnecting > from node 3 due to request timeout. > 2022-07-29 13:39:37.854 INFO 25843 --- [348aefeff-admin] > org.apache.kafka.clients.NetworkClient : [AdminClient > clientId=L.DII.A-b1355e4a-b909-4da1-a832-dd3348aefeff-admin] Cancelled > in-flight METADATA request with correlation id 985 due to node 3 being > disconnected (elapsed time since creation: 60023ms, elapsed time since send: > 60023ms, request timeout: 30000ms) > 2022-07-29 13:39:37.867 ERROR 25843 --- [-StreamThread-1] > o.a.k.s.p.i.InternalTopicManager : stream-thread [main] Unexpected > error during topic description for > L.DII.A-COGROUPKSTREAM-AGGREGATE-STATE-STORE-0000000003-changelog. > Error message was: org.apache.kafka.common.errors.TimeoutException: > Call(callName=describeTopics, deadlineMs=1659101977830, tries=1, > nextAllowedTryMs=1659101977955) timed out at 1659101977855 after 1 attempt(s) > 2022-07-29 13:39:37.869 INFO 25843 --- [-StreamThread-1] > o.a.k.s.p.internals.StreamThread : stream-thread > [L.DII.A-b1355e4a-b909-4da1-a832-dd3348aefeff-StreamThread-1] State > transition from RUNNING to PENDING_SHUTDOWN > {code} > I think the relevant code is in > [https://github.com/apache/kafka/blob/31ff6d7f8af57e8c39061f31774a61bd1728904e/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopicManager.java#L524|https://github.com/apache/kafka/blob/31ff6d7f8af57e8c39061f31774a61bd1728904e/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopicManager.java#L523-L550] > {code:java} > topicFuture.getValue().get();{code} > without a timeout value cannot throw a TimeoutException, so the > TimeoutException of the AdminClient will be an ExecutionException and hit the > last else branch where the StreamsException is thrown. > Possible fix: > Use the KafkaFuture method with timeout: > {code:java} > public abstract T get(long timeout, TimeUnit unit) throws > InterruptedException, ExecutionException, > TimeoutException;{code} > instead of > {code:java} > public abstract T get() throws InterruptedException, ExecutionException;{code} > -- This message was sent by Atlassian Jira (v8.20.10#820010)