[ https://issues.apache.org/jira/browse/KAFKA-14401?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Chaitanya Mukka reassigned KAFKA-14401: --------------------------------------- Assignee: Chaitanya Mukka > Connector/Tasks reading offsets can get stuck if underneath WorkThread dies > --------------------------------------------------------------------------- > > Key: KAFKA-14401 > URL: https://issues.apache.org/jira/browse/KAFKA-14401 > Project: Kafka > Issue Type: Bug > Components: KafkaConnect > Reporter: Sagar Rao > Assignee: Chaitanya Mukka > Priority: Major > > When a connector or task tries to read the offsets from the offsets topic, it > issues `OffsetStorageImpl#offsets` method. This method gets a Future from the > underneath KafkaBackingStore. KafkaBackingStore invokes > `KafkaBasedLog#readToEnd` method and passes the Callback. This method > essentially adds the Callback to a Queue of callbacks that are being managed. > Within KafkaBasedLog, there's a WorkThread which keeps polling over the > callback queue and executes them and it does this in an infinite loop. > However, there is an enclosing try/catch block around the while loop. If > there's an exception thrown which is not caught by any of the other catch > blocks, the control goes to the outermost catch block and the WorkThread is > terminated. However, the connectors/tasks are not aware of this and they > would keep submitting callbacks to KafkaBasedLog with nobody processing them. > This can be seen in the thread dumps as well: > > {code:java} > "task-thread-connector-0" #6334 prio=5 os_prio=0 cpu=19.36ms elapsed=2092.93s > tid=0x00007f8d9c037000 nid=0x5d00 waiting on condition [0x00007f8dc08cd000] > java.lang.Thread.State: WAITING (parking) > at jdk.internal.misc.Unsafe.park(java.base@11.0.15/Native Method) > - parking to wait for <0x000000070345c9a8> (a > java.util.concurrent.CountDownLatch$Sync) > at > java.util.concurrent.locks.LockSupport.park(java.base@11.0.15/LockSupport.java:194) > at > java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(java.base@11.0.15/AbstractQueuedSynchronizer.java:885) > at > java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireSharedInterruptibly(java.base@11.0.15/AbstractQueuedSynchronizer.java:1039) > at > java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireSharedInterruptibly(java.base@11.0.15/AbstractQueuedSynchronizer.java:1345) > at > java.util.concurrent.CountDownLatch.await(java.base@11.0.15/CountDownLatch.java:232) > at > org.apache.kafka.connect.util.ConvertingFutureCallback.get(ConvertingFutureCallback.java:98) > at > org.apache.kafka.connect.storage.OffsetStorageReaderImpl.offsets(OffsetStorageReaderImpl.java:101) > at > org.apache.kafka.connect.storage.OffsetStorageReaderImpl.offset(OffsetStorageReaderImpl.java:63) > {code} > > We need a mechanism to restart the WorkThread if it dies. This could be done > in the outermost catch block for example. > -- This message was sent by Atlassian Jira (v8.20.10#820010)