[ 
https://issues.apache.org/jira/browse/KAFKA-14401?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chris Egerton resolved KAFKA-14401.
-----------------------------------
    Fix Version/s: 3.9.0
       Resolution: Fixed

> Connector/Tasks reading offsets can get stuck if underneath WorkThread dies
> ---------------------------------------------------------------------------
>
>                 Key: KAFKA-14401
>                 URL: https://issues.apache.org/jira/browse/KAFKA-14401
>             Project: Kafka
>          Issue Type: Bug
>          Components: connect
>            Reporter: Sagar Rao
>            Assignee: Sagar Rao
>            Priority: Major
>             Fix For: 3.9.0
>
>
> When a connector or task tries to read the offsets from the offsets topic, it 
> issues `OffsetStorageImpl#offsets` method. This method gets a Future from the 
> underneath KafkaBackingStore. KafkaBackingStore invokes 
> `KafkaBasedLog#readToEnd` method and passes the Callback. This method 
> essentially adds the Callback to a Queue of callbacks that are being managed.
> Within KafkaBasedLog, there's a WorkThread which keeps polling over the 
> callback queue and executes them and it does this in an infinite loop. 
> However, there is an enclosing try/catch block around the while loop. If 
> there's an exception thrown which is not caught by any of the other catch 
> blocks, the control goes to the outermost catch block and the WorkThread is 
> terminated. However, the connectors/tasks are not aware of this and they 
> would keep submitting callbacks to KafkaBasedLog with nobody processing them. 
> This can be seen in the thread dumps as well:
>  
> {code:java}
> "task-thread-connector-0" #6334 prio=5 os_prio=0 cpu=19.36ms elapsed=2092.93s 
> tid=0x00007f8d9c037000 nid=0x5d00 waiting on condition  [0x00007f8dc08cd000]
>    java.lang.Thread.State: WAITING (parking)
>     at jdk.internal.misc.Unsafe.park(java.base@11.0.15/Native Method)
>     - parking to wait for  <0x000000070345c9a8> (a 
> java.util.concurrent.CountDownLatch$Sync)
>     at 
> java.util.concurrent.locks.LockSupport.park(java.base@11.0.15/LockSupport.java:194)
>     at 
> java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(java.base@11.0.15/AbstractQueuedSynchronizer.java:885)
>     at 
> java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireSharedInterruptibly(java.base@11.0.15/AbstractQueuedSynchronizer.java:1039)
>     at 
> java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireSharedInterruptibly(java.base@11.0.15/AbstractQueuedSynchronizer.java:1345)
>     at 
> java.util.concurrent.CountDownLatch.await(java.base@11.0.15/CountDownLatch.java:232)
>     at 
> org.apache.kafka.connect.util.ConvertingFutureCallback.get(ConvertingFutureCallback.java:98)
>     at 
> org.apache.kafka.connect.storage.OffsetStorageReaderImpl.offsets(OffsetStorageReaderImpl.java:101)
>     at 
> org.apache.kafka.connect.storage.OffsetStorageReaderImpl.offset(OffsetStorageReaderImpl.java:63)
>  {code}
>  
> We need a mechanism to fail all such offset read requests. That is because 
> even if we restart the thread, chances are it will still fail with the same 
> error so the offset fetch would be stuck perennially.
> As already explained, this scenario happens mainly when the exception thrown 
> is such that it isn't caught by any of the catch blocks and the control lands 
> up in the outermost catch block. In my experience, I have seen this situation 
> happening on a few occasions, when the exception thrown is:
>  
>  
> {code:java}
> [2022-11-20 09:00:59,307] ERROR Unexpected exception in Thread[KafkaBasedLog 
> Work Thread - connect-offsets,5,main] 
> (org.apache.kafka.connect.util.KafkaBasedLog:440)org.apache.kafka.connect.errors.ConnectException:
>  Error while getting end offsets for topic 'connect-offsets' on brokers at XXX
>       at 
> org.apache.kafka.connect.util.TopicAdmin.endOffsets(TopicAdmin.java:695)      
>       at 
> org.apache.kafka.connect.util.KafkaBasedLog.readEndOffsets(KafkaBasedLog.java:371)
>       
>       at 
> org.apache.kafka.connect.util.KafkaBasedLog.readToLogEnd(KafkaBasedLog.java:332)
>       
>       at 
> org.apache.kafka.connect.util.KafkaBasedLog.access$400(KafkaBasedLog.java:75) 
>      
>       at 
> org.apache.kafka.connect.util.KafkaBasedLog$WorkThread.run(KafkaBasedLog.java:406)
> Caused by: java.util.concurrent.ExecutionException: 
> org.apache.kafka.common.errors.SslAuthenticationException: SSL handshake 
> failed
>       at 
> org.apache.kafka.common.internals.KafkaFutureImpl.wrapAndThrow(KafkaFutureImpl.java:45)
>       at 
> org.apache.kafka.common.internals.KafkaFutureImpl.access$000(KafkaFutureImpl.java:32)
>       at 
> org.apache.kafka.common.internals.KafkaFutureImpl$SingleWaiter.await(KafkaFutureImpl.java:89)
>       at 
> org.apache.kafka.common.internals.KafkaFutureImpl.get(KafkaFutureImpl.java:260)
>       at 
> org.apache.kafka.connect.util.TopicAdmin.endOffsets(TopicAdmin.java:672)
>       ... 4 more
> {code}
> At this point, the WorkThread is dead once the control goes out of the [catch 
> block|https://github.com/apache/kafka/blob/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/util/KafkaBasedLog.java#L608-L610]
> and we can find the following line `Unexpected exception in` in the logs.
> Another example could be when the worker is already OOM and in such cases as 
> well the work thread would die. This is not a good example because once the 
> worker is OOM, we can't make any progress anyways but adding this example for 
> brevity's sake.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to