Patrik Kleindl created KAFKA-14128:
--------------------------------------

             Summary: Kafka Streams terminates on topic check
                 Key: KAFKA-14128
                 URL: https://issues.apache.org/jira/browse/KAFKA-14128
             Project: Kafka
          Issue Type: Bug
          Components: streams
    Affects Versions: 3.0.0
         Environment: Any
            Reporter: Patrik Kleindl


Our streams application shut down unexpectedly after some network issues that 
should have been easily recoverable.

Logs:

 
{code:java}
2022-07-29 13:39:37.854  INFO 25843 --- [348aefeff-admin] 
org.apache.kafka.clients.NetworkClient   : [AdminClient 
clientId=L.DII.A-b1355e4a-b909-4da1-a832-dd3348aefeff-admin] Disconnecting from 
node 3 due to request timeout.
2022-07-29 13:39:37.854  INFO 25843 --- [348aefeff-admin] 
org.apache.kafka.clients.NetworkClient   : [AdminClient 
clientId=L.DII.A-b1355e4a-b909-4da1-a832-dd3348aefeff-admin] Cancelled 
in-flight METADATA request with correlation id 985 due to node 3 being 
disconnected (elapsed time since creation: 60023ms, elapsed time since send: 
60023ms, request timeout: 30000ms)
2022-07-29 13:39:37.867 ERROR 25843 --- [-StreamThread-1] 
o.a.k.s.p.i.InternalTopicManager         : stream-thread [main] Unexpected 
error during topic description for 
L.DII.A-COGROUPKSTREAM-AGGREGATE-STATE-STORE-0000000003-changelog.
Error message was: org.apache.kafka.common.errors.TimeoutException: 
Call(callName=describeTopics, deadlineMs=1659101977830, tries=1, 
nextAllowedTryMs=1659101977955) timed out at 1659101977855 after 1 attempt(s)
2022-07-29 13:39:37.869  INFO 25843 --- [-StreamThread-1] 
o.a.k.s.p.internals.StreamThread         : stream-thread 
[L.DII.A-b1355e4a-b909-4da1-a832-dd3348aefeff-StreamThread-1] State transition 
from RUNNING to PENDING_SHUTDOWN
{code}
I think the relevant code is in 
[https://github.com/apache/kafka/blob/31ff6d7f8af57e8c39061f31774a61bd1728904e/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopicManager.java#L524|https://github.com/apache/kafka/blob/31ff6d7f8af57e8c39061f31774a61bd1728904e/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopicManager.java#L523-L550]
{code:java}
topicFuture.getValue().get();{code}
without a timeout value cannot throw a TimeoutException, so the 
TimeoutException of the AdminClient will be an ExecutionException and hit the 
last else branch where the StreamsException is thrown.

Possible fix:

Use the KafkaFuture method with timeout:
{code:java}
public abstract T get(long timeout, TimeUnit unit) throws InterruptedException, 
ExecutionException,
TimeoutException;{code}
instead of 
{code:java}
public abstract T get() throws InterruptedException, ExecutionException;{code}
 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to