[jira] [Updated] (KAFKA-14401) Connector/Tasks reading offsets can get stuck if underneath WorkThread dies

2024-05-08 Thread Sagar Rao (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14401?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Sagar Rao updated KAFKA-14401:
--
Description: 
When a connector or task tries to read the offsets from the offsets topic, it 
issues `OffsetStorageImpl#offsets` method. This method gets a Future from the 
underneath KafkaBackingStore. KafkaBackingStore invokes 
`KafkaBasedLog#readToEnd` method and passes the Callback. This method 
essentially adds the Callback to a Queue of callbacks that are being managed.

Within KafkaBasedLog, there's a WorkThread which keeps polling over the 
callback queue and executes them and it does this in an infinite loop. However, 
there is an enclosing try/catch block around the while loop. If there's an 
exception thrown which is not caught by any of the other catch blocks, the 
control goes to the outermost catch block and the WorkThread is terminated. 
However, the connectors/tasks are not aware of this and they would keep 
submitting callbacks to KafkaBasedLog with nobody processing them. This can be 
seen in the thread dumps as well:

 
{code:java}
"task-thread-connector-0" #6334 prio=5 os_prio=0 cpu=19.36ms elapsed=2092.93s 
tid=0x7f8d9c037000 nid=0x5d00 waiting on condition  [0x7f8dc08cd000]
   java.lang.Thread.State: WAITING (parking)
    at jdk.internal.misc.Unsafe.park(java.base@11.0.15/Native Method)
    - parking to wait for  <0x00070345c9a8> (a 
java.util.concurrent.CountDownLatch$Sync)
    at 
java.util.concurrent.locks.LockSupport.park(java.base@11.0.15/LockSupport.java:194)
    at 
java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(java.base@11.0.15/AbstractQueuedSynchronizer.java:885)
    at 
java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireSharedInterruptibly(java.base@11.0.15/AbstractQueuedSynchronizer.java:1039)
    at 
java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireSharedInterruptibly(java.base@11.0.15/AbstractQueuedSynchronizer.java:1345)
    at 
java.util.concurrent.CountDownLatch.await(java.base@11.0.15/CountDownLatch.java:232)
    at 
org.apache.kafka.connect.util.ConvertingFutureCallback.get(ConvertingFutureCallback.java:98)
    at 
org.apache.kafka.connect.storage.OffsetStorageReaderImpl.offsets(OffsetStorageReaderImpl.java:101)
    at 
org.apache.kafka.connect.storage.OffsetStorageReaderImpl.offset(OffsetStorageReaderImpl.java:63)
 {code}
 

We need a mechanism to fail all such offset read requests. That is because even 
if we restart the thread, chances are it will still fail with the same error so 
the offset fetch would be stuck perennially.

As already explained, this scenario happens mainly when the exception thrown is 
such that it isn't caught by any of the catch blocks and the control lands up 
in the outermost catch block. In my experience, I have seen this situation 
happening on a few occasions, when the exception thrown is:

 
 
{code:java}
[2022-11-20 09:00:59,307] ERROR Unexpected exception in Thread[KafkaBasedLog 
Work Thread - connect-offsets,5,main] 
(org.apache.kafka.connect.util.KafkaBasedLog:440)org.apache.kafka.connect.errors.ConnectException:
 Error while getting end offsets for topic 'connect-offsets' on brokers at XXX
  at 
org.apache.kafka.connect.util.TopicAdmin.endOffsets(TopicAdmin.java:695)  
  at 
org.apache.kafka.connect.util.KafkaBasedLog.readEndOffsets(KafkaBasedLog.java:371)
  
  at 
org.apache.kafka.connect.util.KafkaBasedLog.readToLogEnd(KafkaBasedLog.java:332)
  
  at 
org.apache.kafka.connect.util.KafkaBasedLog.access$400(KafkaBasedLog.java:75)   
   
  at 
org.apache.kafka.connect.util.KafkaBasedLog$WorkThread.run(KafkaBasedLog.java:406)
Caused by: java.util.concurrent.ExecutionException: 
org.apache.kafka.common.errors.SslAuthenticationException: SSL handshake failed
  at 
org.apache.kafka.common.internals.KafkaFutureImpl.wrapAndThrow(KafkaFutureImpl.java:45)
  at 
org.apache.kafka.common.internals.KafkaFutureImpl.access$000(KafkaFutureImpl.java:32)
  at 
org.apache.kafka.common.internals.KafkaFutureImpl$SingleWaiter.await(KafkaFutureImpl.java:89)
  at 
org.apache.kafka.common.internals.KafkaFutureImpl.get(KafkaFutureImpl.java:260)
  at 
org.apache.kafka.connect.util.TopicAdmin.endOffsets(TopicAdmin.java:672)
  ... 4 more
{code}
At this point, the WorkThread is dead once the control goes out of the [catch 
block|https://github.com/apache/kafka/blob/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/util/KafkaBasedLog.java#L608-L610]

and we can find the following line `Unexpected exception in` in the logs.

Another example could be when the worker is already OOM and in such cases as 
well the work thread would die. This is not a good example because once the 
worker is OOM, we can't make any progress anyways but adding this example for 
brevity's sake.

  was:
When a connector or task tries to read the offsets from the off

[jira] [Updated] (KAFKA-14401) Connector/Tasks reading offsets can get stuck if underneath WorkThread dies

2024-05-08 Thread Sagar Rao (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14401?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Sagar Rao updated KAFKA-14401:
--
Description: 
When a connector or task tries to read the offsets from the offsets topic, it 
issues `OffsetStorageImpl#offsets` method. This method gets a Future from the 
underneath KafkaBackingStore. KafkaBackingStore invokes 
`KafkaBasedLog#readToEnd` method and passes the Callback. This method 
essentially adds the Callback to a Queue of callbacks that are being managed.

Within KafkaBasedLog, there's a WorkThread which keeps polling over the 
callback queue and executes them and it does this in an infinite loop. However, 
there is an enclosing try/catch block around the while loop. If there's an 
exception thrown which is not caught by any of the other catch blocks, the 
control goes to the outermost catch block and the WorkThread is terminated. 
However, the connectors/tasks are not aware of this and they would keep 
submitting callbacks to KafkaBasedLog with nobody processing them. This can be 
seen in the thread dumps as well:

 
{code:java}
"task-thread-connector-0" #6334 prio=5 os_prio=0 cpu=19.36ms elapsed=2092.93s 
tid=0x7f8d9c037000 nid=0x5d00 waiting on condition  [0x7f8dc08cd000]
   java.lang.Thread.State: WAITING (parking)
    at jdk.internal.misc.Unsafe.park(java.base@11.0.15/Native Method)
    - parking to wait for  <0x00070345c9a8> (a 
java.util.concurrent.CountDownLatch$Sync)
    at 
java.util.concurrent.locks.LockSupport.park(java.base@11.0.15/LockSupport.java:194)
    at 
java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(java.base@11.0.15/AbstractQueuedSynchronizer.java:885)
    at 
java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireSharedInterruptibly(java.base@11.0.15/AbstractQueuedSynchronizer.java:1039)
    at 
java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireSharedInterruptibly(java.base@11.0.15/AbstractQueuedSynchronizer.java:1345)
    at 
java.util.concurrent.CountDownLatch.await(java.base@11.0.15/CountDownLatch.java:232)
    at 
org.apache.kafka.connect.util.ConvertingFutureCallback.get(ConvertingFutureCallback.java:98)
    at 
org.apache.kafka.connect.storage.OffsetStorageReaderImpl.offsets(OffsetStorageReaderImpl.java:101)
    at 
org.apache.kafka.connect.storage.OffsetStorageReaderImpl.offset(OffsetStorageReaderImpl.java:63)
 {code}
 

We need a mechanism to fail all such offset read requests. That is because even 
if we restart the thread, chances are it will still fail with the same error so 
the offset fetch would be stuck perennially.

As already explained, this scenario happens mainly when the exception thrown is 
such that it isn't caught by any of the catch blocks and the control lands up 
in the outermost catch block. In my experience, I have seen this situation 
happening on a few occasions, when the exception thrown is:

 
 
{code:java}
Caused by: java.util.concurrent.ExecutionException: 
org.apache.kafka.common.errors.SslAuthenticationException: SSL handshake failed
  at 
org.apache.kafka.common.internals.KafkaFutureImpl.wrapAndThrow(KafkaFutureImpl.java:45)
  at 
org.apache.kafka.common.internals.KafkaFutureImpl.access$000(KafkaFutureImpl.java:32)
  at 
org.apache.kafka.common.internals.KafkaFutureImpl$SingleWaiter.await(KafkaFutureImpl.java:89)
  at 
org.apache.kafka.common.internals.KafkaFutureImpl.get(KafkaFutureImpl.java:260)
  at 
org.apache.kafka.connect.util.TopicAdmin.endOffsets(TopicAdmin.java:672)
  ... 4 more
{code}
At this point, the WorkThread is dead once the control goes out of the [catch 
block|https://github.com/apache/kafka/blob/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/util/KafkaBasedLog.java#L608-L610]

and we can find the following line `Unexpected exception in` in the logs.

Another example could be when the worker is already OOM and in such cases as 
well the work thread would die. This is not a good example because once the 
worker is OOM, we can't make any progress anyways but adding this example for 
brevity's sake.

  was:
When a connector or task tries to read the offsets from the offsets topic, it 
issues `OffsetStorageImpl#offsets` method. This method gets a Future from the 
underneath KafkaBackingStore. KafkaBackingStore invokes 
`KafkaBasedLog#readToEnd` method and passes the Callback. This method 
essentially adds the Callback to a Queue of callbacks that are being managed.

Within KafkaBasedLog, there's a WorkThread which keeps polling over the 
callback queue and executes them and it does this in an infinite loop. However, 
there is an enclosing try/catch block around the while loop. If there's an 
exception thrown which is not caught by any of the other catch blocks, the 
control goes to the outermost catch block and the WorkThread is terminated. 
However, the connectors/tasks are not aware of this and they would keep 
submitting c

[jira] [Updated] (KAFKA-14401) Connector/Tasks reading offsets can get stuck if underneath WorkThread dies

2023-09-11 Thread Sagar Rao (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14401?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Sagar Rao updated KAFKA-14401:
--
Description: 
When a connector or task tries to read the offsets from the offsets topic, it 
issues `OffsetStorageImpl#offsets` method. This method gets a Future from the 
underneath KafkaBackingStore. KafkaBackingStore invokes 
`KafkaBasedLog#readToEnd` method and passes the Callback. This method 
essentially adds the Callback to a Queue of callbacks that are being managed.

Within KafkaBasedLog, there's a WorkThread which keeps polling over the 
callback queue and executes them and it does this in an infinite loop. However, 
there is an enclosing try/catch block around the while loop. If there's an 
exception thrown which is not caught by any of the other catch blocks, the 
control goes to the outermost catch block and the WorkThread is terminated. 
However, the connectors/tasks are not aware of this and they would keep 
submitting callbacks to KafkaBasedLog with nobody processing them. This can be 
seen in the thread dumps as well:

 
{code:java}
"task-thread-connector-0" #6334 prio=5 os_prio=0 cpu=19.36ms elapsed=2092.93s 
tid=0x7f8d9c037000 nid=0x5d00 waiting on condition  [0x7f8dc08cd000]
   java.lang.Thread.State: WAITING (parking)
    at jdk.internal.misc.Unsafe.park(java.base@11.0.15/Native Method)
    - parking to wait for  <0x00070345c9a8> (a 
java.util.concurrent.CountDownLatch$Sync)
    at 
java.util.concurrent.locks.LockSupport.park(java.base@11.0.15/LockSupport.java:194)
    at 
java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(java.base@11.0.15/AbstractQueuedSynchronizer.java:885)
    at 
java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireSharedInterruptibly(java.base@11.0.15/AbstractQueuedSynchronizer.java:1039)
    at 
java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireSharedInterruptibly(java.base@11.0.15/AbstractQueuedSynchronizer.java:1345)
    at 
java.util.concurrent.CountDownLatch.await(java.base@11.0.15/CountDownLatch.java:232)
    at 
org.apache.kafka.connect.util.ConvertingFutureCallback.get(ConvertingFutureCallback.java:98)
    at 
org.apache.kafka.connect.storage.OffsetStorageReaderImpl.offsets(OffsetStorageReaderImpl.java:101)
    at 
org.apache.kafka.connect.storage.OffsetStorageReaderImpl.offset(OffsetStorageReaderImpl.java:63)
 {code}
 

We need a mechanism to fail all such offset read requests. That is because even 
if we restart the thread, chances are it will still fail with the same error so 
the offset fetch would be stuck perennially.

 

  was:
When a connector or task tries to read the offsets from the offsets topic, it 
issues `OffsetStorageImpl#offsets` method. This method gets a Future from the 
underneath KafkaBackingStore. KafkaBackingStore invokes 
`KafkaBasedLog#readToEnd` method and passes the Callback. This method 
essentially adds the Callback to a Queue of callbacks that are being managed.

Within KafkaBasedLog, there's a WorkThread which keeps polling over the 
callback queue and executes them and it does this in an infinite loop. However, 
there is an enclosing try/catch block around the while loop. If there's an 
exception thrown which is not caught by any of the other catch blocks, the 
control goes to the outermost catch block and the WorkThread is terminated. 
However, the connectors/tasks are not aware of this and they would keep 
submitting callbacks to KafkaBasedLog with nobody processing them. This can be 
seen in the thread dumps as well:

 
{code:java}
"task-thread-connector-0" #6334 prio=5 os_prio=0 cpu=19.36ms elapsed=2092.93s 
tid=0x7f8d9c037000 nid=0x5d00 waiting on condition  [0x7f8dc08cd000]
   java.lang.Thread.State: WAITING (parking)
    at jdk.internal.misc.Unsafe.park(java.base@11.0.15/Native Method)
    - parking to wait for  <0x00070345c9a8> (a 
java.util.concurrent.CountDownLatch$Sync)
    at 
java.util.concurrent.locks.LockSupport.park(java.base@11.0.15/LockSupport.java:194)
    at 
java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(java.base@11.0.15/AbstractQueuedSynchronizer.java:885)
    at 
java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireSharedInterruptibly(java.base@11.0.15/AbstractQueuedSynchronizer.java:1039)
    at 
java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireSharedInterruptibly(java.base@11.0.15/AbstractQueuedSynchronizer.java:1345)
    at 
java.util.concurrent.CountDownLatch.await(java.base@11.0.15/CountDownLatch.java:232)
    at 
org.apache.kafka.connect.util.ConvertingFutureCallback.get(ConvertingFutureCallback.java:98)
    at 
org.apache.kafka.connect.storage.OffsetStorageReaderImpl.offsets(OffsetStorageReaderImpl.java:101)
    at 
org.apache.kafka.connect.storage.OffsetStorageReaderImpl.offset(OffsetStorageReaderImpl.java:63)
 {code}
 

We need a mechanism to restart the WorkThread if it dies. Thi

[jira] [Updated] (KAFKA-14401) Connector/Tasks reading offsets can get stuck if underneath WorkThread dies

2023-02-07 Thread Sagar Rao (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14401?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Sagar Rao updated KAFKA-14401:
--
Labels:   (was: KafkaConnect)

> Connector/Tasks reading offsets can get stuck if underneath WorkThread dies
> ---
>
> Key: KAFKA-14401
> URL: https://issues.apache.org/jira/browse/KAFKA-14401
> Project: Kafka
>  Issue Type: Bug
>  Components: KafkaConnect
>Reporter: Sagar Rao
>Priority: Major
>
> When a connector or task tries to read the offsets from the offsets topic, it 
> issues `OffsetStorageImpl#offsets` method. This method gets a Future from the 
> underneath KafkaBackingStore. KafkaBackingStore invokes 
> `KafkaBasedLog#readToEnd` method and passes the Callback. This method 
> essentially adds the Callback to a Queue of callbacks that are being managed.
> Within KafkaBasedLog, there's a WorkThread which keeps polling over the 
> callback queue and executes them and it does this in an infinite loop. 
> However, there is an enclosing try/catch block around the while loop. If 
> there's an exception thrown which is not caught by any of the other catch 
> blocks, the control goes to the outermost catch block and the WorkThread is 
> terminated. However, the connectors/tasks are not aware of this and they 
> would keep submitting callbacks to KafkaBasedLog with nobody processing them. 
> This can be seen in the thread dumps as well:
>  
> {code:java}
> "task-thread-connector-0" #6334 prio=5 os_prio=0 cpu=19.36ms elapsed=2092.93s 
> tid=0x7f8d9c037000 nid=0x5d00 waiting on condition  [0x7f8dc08cd000]
>    java.lang.Thread.State: WAITING (parking)
>     at jdk.internal.misc.Unsafe.park(java.base@11.0.15/Native Method)
>     - parking to wait for  <0x00070345c9a8> (a 
> java.util.concurrent.CountDownLatch$Sync)
>     at 
> java.util.concurrent.locks.LockSupport.park(java.base@11.0.15/LockSupport.java:194)
>     at 
> java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(java.base@11.0.15/AbstractQueuedSynchronizer.java:885)
>     at 
> java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireSharedInterruptibly(java.base@11.0.15/AbstractQueuedSynchronizer.java:1039)
>     at 
> java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireSharedInterruptibly(java.base@11.0.15/AbstractQueuedSynchronizer.java:1345)
>     at 
> java.util.concurrent.CountDownLatch.await(java.base@11.0.15/CountDownLatch.java:232)
>     at 
> org.apache.kafka.connect.util.ConvertingFutureCallback.get(ConvertingFutureCallback.java:98)
>     at 
> org.apache.kafka.connect.storage.OffsetStorageReaderImpl.offsets(OffsetStorageReaderImpl.java:101)
>     at 
> org.apache.kafka.connect.storage.OffsetStorageReaderImpl.offset(OffsetStorageReaderImpl.java:63)
>  {code}
>  
> We need a mechanism to restart the WorkThread if it dies. This could be done 
> in the outermost catch block for example.
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-14401) Connector/Tasks reading offsets can get stuck if underneath WorkThread dies

2023-02-07 Thread Sagar Rao (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14401?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Sagar Rao updated KAFKA-14401:
--
Component/s: KafkaConnect

> Connector/Tasks reading offsets can get stuck if underneath WorkThread dies
> ---
>
> Key: KAFKA-14401
> URL: https://issues.apache.org/jira/browse/KAFKA-14401
> Project: Kafka
>  Issue Type: Bug
>  Components: KafkaConnect
>Reporter: Sagar Rao
>Priority: Major
>  Labels: KafkaConnect
>
> When a connector or task tries to read the offsets from the offsets topic, it 
> issues `OffsetStorageImpl#offsets` method. This method gets a Future from the 
> underneath KafkaBackingStore. KafkaBackingStore invokes 
> `KafkaBasedLog#readToEnd` method and passes the Callback. This method 
> essentially adds the Callback to a Queue of callbacks that are being managed.
> Within KafkaBasedLog, there's a WorkThread which keeps polling over the 
> callback queue and executes them and it does this in an infinite loop. 
> However, there is an enclosing try/catch block around the while loop. If 
> there's an exception thrown which is not caught by any of the other catch 
> blocks, the control goes to the outermost catch block and the WorkThread is 
> terminated. However, the connectors/tasks are not aware of this and they 
> would keep submitting callbacks to KafkaBasedLog with nobody processing them. 
> This can be seen in the thread dumps as well:
>  
> {code:java}
> "task-thread-connector-0" #6334 prio=5 os_prio=0 cpu=19.36ms elapsed=2092.93s 
> tid=0x7f8d9c037000 nid=0x5d00 waiting on condition  [0x7f8dc08cd000]
>    java.lang.Thread.State: WAITING (parking)
>     at jdk.internal.misc.Unsafe.park(java.base@11.0.15/Native Method)
>     - parking to wait for  <0x00070345c9a8> (a 
> java.util.concurrent.CountDownLatch$Sync)
>     at 
> java.util.concurrent.locks.LockSupport.park(java.base@11.0.15/LockSupport.java:194)
>     at 
> java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(java.base@11.0.15/AbstractQueuedSynchronizer.java:885)
>     at 
> java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireSharedInterruptibly(java.base@11.0.15/AbstractQueuedSynchronizer.java:1039)
>     at 
> java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireSharedInterruptibly(java.base@11.0.15/AbstractQueuedSynchronizer.java:1345)
>     at 
> java.util.concurrent.CountDownLatch.await(java.base@11.0.15/CountDownLatch.java:232)
>     at 
> org.apache.kafka.connect.util.ConvertingFutureCallback.get(ConvertingFutureCallback.java:98)
>     at 
> org.apache.kafka.connect.storage.OffsetStorageReaderImpl.offsets(OffsetStorageReaderImpl.java:101)
>     at 
> org.apache.kafka.connect.storage.OffsetStorageReaderImpl.offset(OffsetStorageReaderImpl.java:63)
>  {code}
>  
> We need a mechanism to restart the WorkThread if it dies. This could be done 
> in the outermost catch block for example.
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-14401) Connector/Tasks reading offsets can get stuck if underneath WorkThread dies

2023-02-07 Thread Sagar Rao (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14401?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Sagar Rao updated KAFKA-14401:
--
Labels: KafkaConnect  (was: Connect)

> Connector/Tasks reading offsets can get stuck if underneath WorkThread dies
> ---
>
> Key: KAFKA-14401
> URL: https://issues.apache.org/jira/browse/KAFKA-14401
> Project: Kafka
>  Issue Type: Bug
>Reporter: Sagar Rao
>Priority: Major
>  Labels: KafkaConnect
>
> When a connector or task tries to read the offsets from the offsets topic, it 
> issues `OffsetStorageImpl#offsets` method. This method gets a Future from the 
> underneath KafkaBackingStore. KafkaBackingStore invokes 
> `KafkaBasedLog#readToEnd` method and passes the Callback. This method 
> essentially adds the Callback to a Queue of callbacks that are being managed.
> Within KafkaBasedLog, there's a WorkThread which keeps polling over the 
> callback queue and executes them and it does this in an infinite loop. 
> However, there is an enclosing try/catch block around the while loop. If 
> there's an exception thrown which is not caught by any of the other catch 
> blocks, the control goes to the outermost catch block and the WorkThread is 
> terminated. However, the connectors/tasks are not aware of this and they 
> would keep submitting callbacks to KafkaBasedLog with nobody processing them. 
> This can be seen in the thread dumps as well:
>  
> {code:java}
> "task-thread-connector-0" #6334 prio=5 os_prio=0 cpu=19.36ms elapsed=2092.93s 
> tid=0x7f8d9c037000 nid=0x5d00 waiting on condition  [0x7f8dc08cd000]
>    java.lang.Thread.State: WAITING (parking)
>     at jdk.internal.misc.Unsafe.park(java.base@11.0.15/Native Method)
>     - parking to wait for  <0x00070345c9a8> (a 
> java.util.concurrent.CountDownLatch$Sync)
>     at 
> java.util.concurrent.locks.LockSupport.park(java.base@11.0.15/LockSupport.java:194)
>     at 
> java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(java.base@11.0.15/AbstractQueuedSynchronizer.java:885)
>     at 
> java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireSharedInterruptibly(java.base@11.0.15/AbstractQueuedSynchronizer.java:1039)
>     at 
> java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireSharedInterruptibly(java.base@11.0.15/AbstractQueuedSynchronizer.java:1345)
>     at 
> java.util.concurrent.CountDownLatch.await(java.base@11.0.15/CountDownLatch.java:232)
>     at 
> org.apache.kafka.connect.util.ConvertingFutureCallback.get(ConvertingFutureCallback.java:98)
>     at 
> org.apache.kafka.connect.storage.OffsetStorageReaderImpl.offsets(OffsetStorageReaderImpl.java:101)
>     at 
> org.apache.kafka.connect.storage.OffsetStorageReaderImpl.offset(OffsetStorageReaderImpl.java:63)
>  {code}
>  
> We need a mechanism to restart the WorkThread if it dies. This could be done 
> in the outermost catch block for example.
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-14401) Connector/Tasks reading offsets can get stuck if underneath WorkThread dies

2023-02-07 Thread Sagar Rao (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14401?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Sagar Rao updated KAFKA-14401:
--
Labels: Connect  (was: )

> Connector/Tasks reading offsets can get stuck if underneath WorkThread dies
> ---
>
> Key: KAFKA-14401
> URL: https://issues.apache.org/jira/browse/KAFKA-14401
> Project: Kafka
>  Issue Type: Bug
>Reporter: Sagar Rao
>Priority: Major
>  Labels: Connect
>
> When a connector or task tries to read the offsets from the offsets topic, it 
> issues `OffsetStorageImpl#offsets` method. This method gets a Future from the 
> underneath KafkaBackingStore. KafkaBackingStore invokes 
> `KafkaBasedLog#readToEnd` method and passes the Callback. This method 
> essentially adds the Callback to a Queue of callbacks that are being managed.
> Within KafkaBasedLog, there's a WorkThread which keeps polling over the 
> callback queue and executes them and it does this in an infinite loop. 
> However, there is an enclosing try/catch block around the while loop. If 
> there's an exception thrown which is not caught by any of the other catch 
> blocks, the control goes to the outermost catch block and the WorkThread is 
> terminated. However, the connectors/tasks are not aware of this and they 
> would keep submitting callbacks to KafkaBasedLog with nobody processing them. 
> This can be seen in the thread dumps as well:
>  
> {code:java}
> "task-thread-connector-0" #6334 prio=5 os_prio=0 cpu=19.36ms elapsed=2092.93s 
> tid=0x7f8d9c037000 nid=0x5d00 waiting on condition  [0x7f8dc08cd000]
>    java.lang.Thread.State: WAITING (parking)
>     at jdk.internal.misc.Unsafe.park(java.base@11.0.15/Native Method)
>     - parking to wait for  <0x00070345c9a8> (a 
> java.util.concurrent.CountDownLatch$Sync)
>     at 
> java.util.concurrent.locks.LockSupport.park(java.base@11.0.15/LockSupport.java:194)
>     at 
> java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(java.base@11.0.15/AbstractQueuedSynchronizer.java:885)
>     at 
> java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireSharedInterruptibly(java.base@11.0.15/AbstractQueuedSynchronizer.java:1039)
>     at 
> java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireSharedInterruptibly(java.base@11.0.15/AbstractQueuedSynchronizer.java:1345)
>     at 
> java.util.concurrent.CountDownLatch.await(java.base@11.0.15/CountDownLatch.java:232)
>     at 
> org.apache.kafka.connect.util.ConvertingFutureCallback.get(ConvertingFutureCallback.java:98)
>     at 
> org.apache.kafka.connect.storage.OffsetStorageReaderImpl.offsets(OffsetStorageReaderImpl.java:101)
>     at 
> org.apache.kafka.connect.storage.OffsetStorageReaderImpl.offset(OffsetStorageReaderImpl.java:63)
>  {code}
>  
> We need a mechanism to restart the WorkThread if it dies. This could be done 
> in the outermost catch block for example.
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-14401) Connector/Tasks reading offsets can get stuck if underneath WorkThread dies

2022-11-18 Thread Sagar Rao (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14401?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Sagar Rao updated KAFKA-14401:
--
Description: 
When a connector or task tries to read the offsets from the offsets topic, it 
issues `OffsetStorageImpl#offsets` method. This method gets a Future from the 
underneath KafkaBackingStore. KafkaBackingStore invokes 
`KafkaBasedLog#readToEnd` method and passes the Callback. This method 
essentially adds the Callback to a Queue of callbacks that are being managed.

Within KafkaBasedLog, there's a WorkThread which keeps polling over the 
callback queue and executes them and it does this in an infinite loop. However, 
there is an enclosing try/catch block around the while loop. If there's an 
exception thrown which is not caught by any of the other catch blocks, the 
control goes to the outermost catch block and the WorkThread is terminated. 
However, the connectors/tasks are not aware of this and they would keep 
submitting callbacks to KafkaBasedLog with nobody processing them. This can be 
seen in the thread dumps as well:

 
{code:java}
"task-thread-connector-0" #6334 prio=5 os_prio=0 cpu=19.36ms elapsed=2092.93s 
tid=0x7f8d9c037000 nid=0x5d00 waiting on condition  [0x7f8dc08cd000]
   java.lang.Thread.State: WAITING (parking)
    at jdk.internal.misc.Unsafe.park(java.base@11.0.15/Native Method)
    - parking to wait for  <0x00070345c9a8> (a 
java.util.concurrent.CountDownLatch$Sync)
    at 
java.util.concurrent.locks.LockSupport.park(java.base@11.0.15/LockSupport.java:194)
    at 
java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(java.base@11.0.15/AbstractQueuedSynchronizer.java:885)
    at 
java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireSharedInterruptibly(java.base@11.0.15/AbstractQueuedSynchronizer.java:1039)
    at 
java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireSharedInterruptibly(java.base@11.0.15/AbstractQueuedSynchronizer.java:1345)
    at 
java.util.concurrent.CountDownLatch.await(java.base@11.0.15/CountDownLatch.java:232)
    at 
org.apache.kafka.connect.util.ConvertingFutureCallback.get(ConvertingFutureCallback.java:98)
    at 
org.apache.kafka.connect.storage.OffsetStorageReaderImpl.offsets(OffsetStorageReaderImpl.java:101)
    at 
org.apache.kafka.connect.storage.OffsetStorageReaderImpl.offset(OffsetStorageReaderImpl.java:63)
 {code}
 

We need a mechanism to restart the WorkThread if it dies. This could be done in 
the outermost catch block for example.

 

  was:
When a connector or task tries to read the offsets from the offsets topic, it 
issues `OffsetStorageImpl#offsets` method. This method gets a Future from the 
underneath KafkaBackingStore. KafkaBackingStore invokes 
`KafkaBasedLog#readToEnd` method and passes the Callback. This method 
essentially adds the Callback to a Queue of callbacks that are being managed.

Within KafkaBasedLog, there's a WorkThread which keeps polling over the 
callback queue and executes them and it does this in an infinite loop. However, 
there is an enclosing try/catch block around the while loop. If there's an 
exception thrown which is not caught by any of the other catch blocks, the 
control goes to the outermost catch block and the WorkThread is terminated. 
However, the connectors/tasks are not aware of this and they would keep 
submitting callbacks to KafkaBasedLog with nobody processing them. This can be 
seen in the thread dumps as well:

 

```

"task-thread-connector-0" #6334 prio=5 os_prio=0 cpu=19.36ms elapsed=2092.93s 
tid=0x7f8d9c037000 nid=0x5d00 waiting on condition  [0x7f8dc08cd000]
   java.lang.Thread.State: WAITING (parking)
    at jdk.internal.misc.Unsafe.park(java.base@11.0.15/Native Method)
    - parking to wait for  <0x00070345c9a8> (a 
java.util.concurrent.CountDownLatch$Sync)
    at 
java.util.concurrent.locks.LockSupport.park(java.base@11.0.15/LockSupport.java:194)
    at 
java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(java.base@11.0.15/AbstractQueuedSynchronizer.java:885)
    at 
java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireSharedInterruptibly(java.base@11.0.15/AbstractQueuedSynchronizer.java:1039)
    at 
java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireSharedInterruptibly(java.base@11.0.15/AbstractQueuedSynchronizer.java:1345)
    at 
java.util.concurrent.CountDownLatch.await(java.base@11.0.15/CountDownLatch.java:232)
    at 
org.apache.kafka.connect.util.ConvertingFutureCallback.get(ConvertingFutureCallback.java:98)
    at 
org.apache.kafka.connect.storage.OffsetStorageReaderImpl.offsets(OffsetStorageReaderImpl.java:101)
    at 
org.apache.kafka.connect.storage.OffsetStorageReaderImpl.offset(OffsetStorageReaderImpl.java:63)

```

 

We need a mechanism to restart the WorkThread if it dies. This could be done in 
the outermost catch block for example.

 


> Connector/Tasks reading offse