[ 
https://issues.apache.org/jira/browse/KAFKA-14128?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax resolved KAFKA-14128.
-------------------------------------
    Fix Version/s: 3.5.0
                   3.4.1
       Resolution: Fixed

> Kafka Streams terminates on topic check
> ---------------------------------------
>
>                 Key: KAFKA-14128
>                 URL: https://issues.apache.org/jira/browse/KAFKA-14128
>             Project: Kafka
>          Issue Type: Bug
>          Components: streams
>    Affects Versions: 3.0.0
>         Environment: Any
>            Reporter: Patrik Kleindl
>            Assignee: Lucia Cerchie
>            Priority: Major
>             Fix For: 3.5.0, 3.4.1
>
>
> Our streams application shut down unexpectedly after some network issues that 
> should have been easily recoverable.
> Logs:
>  
> {code:java}
> 2022-07-29 13:39:37.854  INFO 25843 --- [348aefeff-admin] 
> org.apache.kafka.clients.NetworkClient   : [AdminClient 
> clientId=L.DII.A-b1355e4a-b909-4da1-a832-dd3348aefeff-admin] Disconnecting 
> from node 3 due to request timeout.
> 2022-07-29 13:39:37.854  INFO 25843 --- [348aefeff-admin] 
> org.apache.kafka.clients.NetworkClient   : [AdminClient 
> clientId=L.DII.A-b1355e4a-b909-4da1-a832-dd3348aefeff-admin] Cancelled 
> in-flight METADATA request with correlation id 985 due to node 3 being 
> disconnected (elapsed time since creation: 60023ms, elapsed time since send: 
> 60023ms, request timeout: 30000ms)
> 2022-07-29 13:39:37.867 ERROR 25843 --- [-StreamThread-1] 
> o.a.k.s.p.i.InternalTopicManager         : stream-thread [main] Unexpected 
> error during topic description for 
> L.DII.A-COGROUPKSTREAM-AGGREGATE-STATE-STORE-0000000003-changelog.
> Error message was: org.apache.kafka.common.errors.TimeoutException: 
> Call(callName=describeTopics, deadlineMs=1659101977830, tries=1, 
> nextAllowedTryMs=1659101977955) timed out at 1659101977855 after 1 attempt(s)
> 2022-07-29 13:39:37.869  INFO 25843 --- [-StreamThread-1] 
> o.a.k.s.p.internals.StreamThread         : stream-thread 
> [L.DII.A-b1355e4a-b909-4da1-a832-dd3348aefeff-StreamThread-1] State 
> transition from RUNNING to PENDING_SHUTDOWN
> {code}
> I think the relevant code is in 
> [https://github.com/apache/kafka/blob/31ff6d7f8af57e8c39061f31774a61bd1728904e/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopicManager.java#L524|https://github.com/apache/kafka/blob/31ff6d7f8af57e8c39061f31774a61bd1728904e/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopicManager.java#L523-L550]
> {code:java}
> topicFuture.getValue().get();{code}
> without a timeout value cannot throw a TimeoutException, so the 
> TimeoutException of the AdminClient will be an ExecutionException and hit the 
> last else branch where the StreamsException is thrown.
> Possible fix:
> Use the KafkaFuture method with timeout:
> {code:java}
> public abstract T get(long timeout, TimeUnit unit) throws 
> InterruptedException, ExecutionException,
> TimeoutException;{code}
> instead of 
> {code:java}
> public abstract T get() throws InterruptedException, ExecutionException;{code}
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to