[
https://issues.apache.org/jira/browse/KAFKA-17871?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Greg Harris updated KAFKA-17871:
--------------------------------
Description:
In KAFKA-9051, source task offsets reading was modified to allow for
in-progress read Futures to be cancelled during task shutdown. The
OffsetReaderStorageImpl#offsetReadFutures uses explicitly synchronized accesses
to prevent data races between task cancellation and connectors reading offsets.
A thread executing OffsetReaderStorageImpl#offsets method can lock the Set, and
then call Producer#flush inside KafkaBasedLog#flush.
At the same time, the herder thread may try to shut down the task, time out,
and call AbstractWorkerSourceTask#cancel. This cancellation attempts to lock
the Set again, and must wait for the Producer#flush to complete. If the task's
producer is unhealthy, this can block the herder thread indefinitely.
See the following stacktrace:
{noformat}
java.lang.Thread.State: BLOCKED (on object monitor)
at
org.apache.kafka.connect.storage.OffsetStorageReaderImpl.close(OffsetStorageReaderImpl.java:148)
- waiting to lock <0x00000006e6ce0748> (a java.util.HashSet)
at
org.apache.kafka.connect.runtime.AbstractWorkerSourceTask.cancel(AbstractWorkerSourceTask.java:288)
at
org.apache.kafka.connect.runtime.Worker.awaitStopTask(Worker.java:1036)
at
org.apache.kafka.connect.runtime.Worker.awaitStopTasks(Worker.java:1054)
at
org.apache.kafka.connect.runtime.Worker.stopAndAwaitTask(Worker.java:1082)
at
org.apache.kafka.connect.runtime.distributed.DistributedHerder.lambda$restartTask$23(DistributedHerder.java:1369)
at
org.apache.kafka.connect.runtime.distributed.DistributedHerder$$Lambda$1647/0x00007f3d01941b28.call(Unknown
Source)
at
org.apache.kafka.connect.runtime.distributed.DistributedHerder.runRequest(DistributedHerder.java:2240)
at
org.apache.kafka.connect.runtime.distributed.DistributedHerder.tick(DistributedHerder.java:470)
at
org.apache.kafka.connect.runtime.distributed.DistributedHerder.run(DistributedHerder.java:371)
at
java.util.concurrent.Executors$RunnableAdapter.call([email protected]/Executors.java:539)
at
java.util.concurrent.FutureTask.run([email protected]/FutureTask.java:264)
at
java.util.concurrent.ThreadPoolExecutor.runWorker([email protected]/ThreadPoolExecutor.java:1136)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run([email protected]/ThreadPoolExecutor.java:635)
at java.lang.Thread.run([email protected]/Thread.java:840){noformat}
and
{noformat}
java.lang.Thread.State: WAITING (parking)
at jdk.internal.misc.Unsafe.park([email protected]/Native Method)
- parking to wait for <0x00000006e4f9d610> (a
java.util.concurrent.CountDownLatch$Sync)
at
java.util.concurrent.locks.LockSupport.park([email protected]/LockSupport.java:211)
at
java.util.concurrent.locks.AbstractQueuedSynchronizer.acquire([email protected]/AbstractQueuedSynchronizer.java:715)
at
java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireSharedInterruptibly([email protected]/AbstractQueuedSynchronizer.java:1047)
at
java.util.concurrent.CountDownLatch.await([email protected]/CountDownLatch.java:230)
at
org.apache.kafka.clients.producer.internals.ProduceRequestResult.await(ProduceRequestResult.java:76)
at
org.apache.kafka.clients.producer.internals.RecordAccumulator.awaitFlushCompletion(RecordAccumulator.java:1075)
at
org.apache.kafka.clients.producer.KafkaProducer.flush(KafkaProducer.java:1215)
at
org.apache.kafka.connect.util.KafkaBasedLog$$Lambda$861/0x00007f3d017dc9d8.accept(Unknown
Source)
at java.util.Optional.ifPresent([email protected]/Optional.java:178)
at
org.apache.kafka.connect.util.KafkaBasedLog.flush(KafkaBasedLog.java:345)
at
org.apache.kafka.connect.util.KafkaBasedLog.readToEnd(KafkaBasedLog.java:334)
at
org.apache.kafka.connect.storage.KafkaOffsetBackingStore.get(KafkaOffsetBackingStore.java:295)
at
org.apache.kafka.connect.storage.ConnectorOffsetBackingStore.lambda$getFromStore$5(ConnectorOffsetBackingStore.java:348)
at
org.apache.kafka.connect.storage.ConnectorOffsetBackingStore$$Lambda$1277/0x00007f3d0184d600.apply(Unknown
Source)
at java.util.Optional.map([email protected]/Optional.java:260)
at
org.apache.kafka.connect.storage.ConnectorOffsetBackingStore.getFromStore(ConnectorOffsetBackingStore.java:348)
at
org.apache.kafka.connect.storage.ConnectorOffsetBackingStore.get(ConnectorOffsetBackingStore.java:208)
at
org.apache.kafka.connect.storage.OffsetStorageReaderImpl.offsets(OffsetStorageReaderImpl.java:96)
- locked <0x00000006e6ce0748> (a java.util.HashSet)
at
io.debezium.connector.common.OffsetReader.offsets(OffsetReader.java:42)
at
io.debezium.connector.common.BaseSourceTask.getPreviousOffsets(BaseSourceTask.java:365)
at
io.debezium.connector.postgresql.PostgresConnectorTask.start(PostgresConnectorTask.java:112)
at
io.debezium.connector.common.BaseSourceTask.startIfNeededAndPossible(BaseSourceTask.java:251)
at
io.debezium.connector.common.BaseSourceTask.poll(BaseSourceTask.java:178)
at
org.apache.kafka.connect.runtime.AbstractWorkerSourceTask.poll(AbstractWorkerSourceTask.java:469)
at
org.apache.kafka.connect.runtime.AbstractWorkerSourceTask.execute(AbstractWorkerSourceTask.java:357)
at
org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:204)
at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:259)
at
org.apache.kafka.connect.runtime.AbstractWorkerSourceTask.run(AbstractWorkerSourceTask.java:77)
at
org.apache.kafka.connect.runtime.isolation.Plugins.lambda$withClassLoader$1(Plugins.java:236)
at
org.apache.kafka.connect.runtime.isolation.Plugins$$Lambda$990/0x00007f3d01814e08.run(Unknown
Source)
at
java.util.concurrent.Executors$RunnableAdapter.call([email protected]/Executors.java:539)
at
java.util.concurrent.FutureTask.run([email protected]/FutureTask.java:264)
at
java.util.concurrent.ThreadPoolExecutor.runWorker([email protected]/ThreadPoolExecutor.java:1136)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run([email protected]/ThreadPoolExecutor.java:635)
at java.lang.Thread.run([email protected]/Thread.java:840){noformat}
was:
In KAFKA-9051, source task offsets reading was modified to allow for
in-progress read Futures to be cancelled during task shutdown. The
OffsetReaderStorageImpl#offsetReadFutures uses explicitly synchronized accesses
to prevent data races between task cancellation and connectors reading offsets.
A thread executing OffsetReaderStorageImpl#offsets method can lock the Set, and
then call Producer#flush inside KafkaBasedLog#flush.
At the same time, the herder thread may try to shut down the task, time out,
and call AbstractWorkerSourceTask#cancel. This cancellation attempts to lock
the Set again, and must wait for the Producer#flush to complete. If the task's
producer is unhealthy, this can block the herder thread indefinitely.
See the following stacktrace:
{noformat}
java.lang.Thread.State: BLOCKED (on object monitor)
at
org.apache.kafka.connect.storage.OffsetStorageReaderImpl.close(OffsetStorageReaderImpl.java:148)
- waiting to lock <0x00000006e6ce0748> (a java.util.HashSet)
at
org.apache.kafka.connect.runtime.AbstractWorkerSourceTask.cancel(AbstractWorkerSourceTask.java:288)
at
org.apache.kafka.connect.runtime.Worker.awaitStopTask(Worker.java:1036)
at
org.apache.kafka.connect.runtime.Worker.awaitStopTasks(Worker.java:1054)
at
org.apache.kafka.connect.runtime.Worker.stopAndAwaitTask(Worker.java:1082)
at
org.apache.kafka.connect.runtime.distributed.DistributedHerder.lambda$restartTask$23(DistributedHerder.java:1369)
at
org.apache.kafka.connect.runtime.distributed.DistributedHerder$$Lambda$1647/0x00007f3d01941b28.call(Unknown
Source)
at
org.apache.kafka.connect.runtime.distributed.DistributedHerder.runRequest(DistributedHerder.java:2240)
at
org.apache.kafka.connect.runtime.distributed.DistributedHerder.tick(DistributedHerder.java:470)
Oct 24 15:55:10 hh-stage-aiven-cdc-kafka-connect-4 java[1180436]: at
org.apache.kafka.connect.runtime.distributed.DistributedHerder.run(DistributedHerder.java:371)
at
java.util.concurrent.Executors$RunnableAdapter.call([email protected]/Executors.java:539)
at
java.util.concurrent.FutureTask.run([email protected]/FutureTask.java:264)
at
java.util.concurrent.ThreadPoolExecutor.runWorker([email protected]/ThreadPoolExecutor.java:1136)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run([email protected]/ThreadPoolExecutor.java:635)
at java.lang.Thread.run([email protected]/Thread.java:840){noformat}
and
{noformat}
java.lang.Thread.State: WAITING (parking)
at jdk.internal.misc.Unsafe.park([email protected]/Native Method)
- parking to wait for <0x00000006e4f9d610> (a
java.util.concurrent.CountDownLatch$Sync)
at
java.util.concurrent.locks.LockSupport.park([email protected]/LockSupport.java:211)
at
java.util.concurrent.locks.AbstractQueuedSynchronizer.acquire([email protected]/AbstractQueuedSynchronizer.java:715)
at
java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireSharedInterruptibly([email protected]/AbstractQueuedSynchronizer.java:1047)
at
java.util.concurrent.CountDownLatch.await([email protected]/CountDownLatch.java:230)
at
org.apache.kafka.clients.producer.internals.ProduceRequestResult.await(ProduceRequestResult.java:76)
at
org.apache.kafka.clients.producer.internals.RecordAccumulator.awaitFlushCompletion(RecordAccumulator.java:1075)
at
org.apache.kafka.clients.producer.KafkaProducer.flush(KafkaProducer.java:1215)
at
org.apache.kafka.connect.util.KafkaBasedLog$$Lambda$861/0x00007f3d017dc9d8.accept(Unknown
Source)
at java.util.Optional.ifPresent([email protected]/Optional.java:178)
at
org.apache.kafka.connect.util.KafkaBasedLog.flush(KafkaBasedLog.java:345)
at
org.apache.kafka.connect.util.KafkaBasedLog.readToEnd(KafkaBasedLog.java:334)
at
org.apache.kafka.connect.storage.KafkaOffsetBackingStore.get(KafkaOffsetBackingStore.java:295)
at
org.apache.kafka.connect.storage.ConnectorOffsetBackingStore.lambda$getFromStore$5(ConnectorOffsetBackingStore.java:348)
at
org.apache.kafka.connect.storage.ConnectorOffsetBackingStore$$Lambda$1277/0x00007f3d0184d600.apply(Unknown
Source)
at java.util.Optional.map([email protected]/Optional.java:260)
at
org.apache.kafka.connect.storage.ConnectorOffsetBackingStore.getFromStore(ConnectorOffsetBackingStore.java:348)
at
org.apache.kafka.connect.storage.ConnectorOffsetBackingStore.get(ConnectorOffsetBackingStore.java:208)
at
org.apache.kafka.connect.storage.OffsetStorageReaderImpl.offsets(OffsetStorageReaderImpl.java:96)
- locked <0x00000006e6ce0748> (a java.util.HashSet)
at
io.debezium.connector.common.OffsetReader.offsets(OffsetReader.java:42)
at
io.debezium.connector.common.BaseSourceTask.getPreviousOffsets(BaseSourceTask.java:365)
at
io.debezium.connector.postgresql.PostgresConnectorTask.start(PostgresConnectorTask.java:112)
at
io.debezium.connector.common.BaseSourceTask.startIfNeededAndPossible(BaseSourceTask.java:251)
at
io.debezium.connector.common.BaseSourceTask.poll(BaseSourceTask.java:178)
at
org.apache.kafka.connect.runtime.AbstractWorkerSourceTask.poll(AbstractWorkerSourceTask.java:469)
at
org.apache.kafka.connect.runtime.AbstractWorkerSourceTask.execute(AbstractWorkerSourceTask.java:357)
at
org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:204)
at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:259)
at
org.apache.kafka.connect.runtime.AbstractWorkerSourceTask.run(AbstractWorkerSourceTask.java:77)
at
org.apache.kafka.connect.runtime.isolation.Plugins.lambda$withClassLoader$1(Plugins.java:236)
at
org.apache.kafka.connect.runtime.isolation.Plugins$$Lambda$990/0x00007f3d01814e08.run(Unknown
Source)
at
java.util.concurrent.Executors$RunnableAdapter.call([email protected]/Executors.java:539)
at
java.util.concurrent.FutureTask.run([email protected]/FutureTask.java:264)
at
java.util.concurrent.ThreadPoolExecutor.runWorker([email protected]/ThreadPoolExecutor.java:1136)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run([email protected]/ThreadPoolExecutor.java:635)
at java.lang.Thread.run([email protected]/Thread.java:840){noformat}
> Source task source offset reads can block herder task cancellation
> ------------------------------------------------------------------
>
> Key: KAFKA-17871
> URL: https://issues.apache.org/jira/browse/KAFKA-17871
> Project: Kafka
> Issue Type: Bug
> Components: connect
> Affects Versions: 2.5.0
> Reporter: Greg Harris
> Priority: Major
>
> In KAFKA-9051, source task offsets reading was modified to allow for
> in-progress read Futures to be cancelled during task shutdown. The
> OffsetReaderStorageImpl#offsetReadFutures uses explicitly synchronized
> accesses to prevent data races between task cancellation and connectors
> reading offsets.
> A thread executing OffsetReaderStorageImpl#offsets method can lock the Set,
> and then call Producer#flush inside KafkaBasedLog#flush.
> At the same time, the herder thread may try to shut down the task, time out,
> and call AbstractWorkerSourceTask#cancel. This cancellation attempts to lock
> the Set again, and must wait for the Producer#flush to complete. If the
> task's producer is unhealthy, this can block the herder thread indefinitely.
> See the following stacktrace:
>
> {noformat}
> java.lang.Thread.State: BLOCKED (on object monitor)
> at
> org.apache.kafka.connect.storage.OffsetStorageReaderImpl.close(OffsetStorageReaderImpl.java:148)
> - waiting to lock <0x00000006e6ce0748> (a java.util.HashSet)
> at
> org.apache.kafka.connect.runtime.AbstractWorkerSourceTask.cancel(AbstractWorkerSourceTask.java:288)
> at
> org.apache.kafka.connect.runtime.Worker.awaitStopTask(Worker.java:1036)
> at
> org.apache.kafka.connect.runtime.Worker.awaitStopTasks(Worker.java:1054)
> at
> org.apache.kafka.connect.runtime.Worker.stopAndAwaitTask(Worker.java:1082)
> at
> org.apache.kafka.connect.runtime.distributed.DistributedHerder.lambda$restartTask$23(DistributedHerder.java:1369)
> at
> org.apache.kafka.connect.runtime.distributed.DistributedHerder$$Lambda$1647/0x00007f3d01941b28.call(Unknown
> Source)
> at
> org.apache.kafka.connect.runtime.distributed.DistributedHerder.runRequest(DistributedHerder.java:2240)
> at
> org.apache.kafka.connect.runtime.distributed.DistributedHerder.tick(DistributedHerder.java:470)
> at
> org.apache.kafka.connect.runtime.distributed.DistributedHerder.run(DistributedHerder.java:371)
> at
> java.util.concurrent.Executors$RunnableAdapter.call([email protected]/Executors.java:539)
> at
> java.util.concurrent.FutureTask.run([email protected]/FutureTask.java:264)
> at
> java.util.concurrent.ThreadPoolExecutor.runWorker([email protected]/ThreadPoolExecutor.java:1136)
> at
> java.util.concurrent.ThreadPoolExecutor$Worker.run([email protected]/ThreadPoolExecutor.java:635)
> at java.lang.Thread.run([email protected]/Thread.java:840){noformat}
> and
>
>
> {noformat}
> java.lang.Thread.State: WAITING (parking)
> at jdk.internal.misc.Unsafe.park([email protected]/Native Method)
> - parking to wait for <0x00000006e4f9d610> (a
> java.util.concurrent.CountDownLatch$Sync)
> at
> java.util.concurrent.locks.LockSupport.park([email protected]/LockSupport.java:211)
> at
> java.util.concurrent.locks.AbstractQueuedSynchronizer.acquire([email protected]/AbstractQueuedSynchronizer.java:715)
> at
> java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireSharedInterruptibly([email protected]/AbstractQueuedSynchronizer.java:1047)
> at
> java.util.concurrent.CountDownLatch.await([email protected]/CountDownLatch.java:230)
> at
> org.apache.kafka.clients.producer.internals.ProduceRequestResult.await(ProduceRequestResult.java:76)
> at
> org.apache.kafka.clients.producer.internals.RecordAccumulator.awaitFlushCompletion(RecordAccumulator.java:1075)
> at
> org.apache.kafka.clients.producer.KafkaProducer.flush(KafkaProducer.java:1215)
> at
> org.apache.kafka.connect.util.KafkaBasedLog$$Lambda$861/0x00007f3d017dc9d8.accept(Unknown
> Source)
> at java.util.Optional.ifPresent([email protected]/Optional.java:178)
> at
> org.apache.kafka.connect.util.KafkaBasedLog.flush(KafkaBasedLog.java:345)
> at
> org.apache.kafka.connect.util.KafkaBasedLog.readToEnd(KafkaBasedLog.java:334)
> at
> org.apache.kafka.connect.storage.KafkaOffsetBackingStore.get(KafkaOffsetBackingStore.java:295)
> at
> org.apache.kafka.connect.storage.ConnectorOffsetBackingStore.lambda$getFromStore$5(ConnectorOffsetBackingStore.java:348)
> at
> org.apache.kafka.connect.storage.ConnectorOffsetBackingStore$$Lambda$1277/0x00007f3d0184d600.apply(Unknown
> Source)
> at java.util.Optional.map([email protected]/Optional.java:260)
> at
> org.apache.kafka.connect.storage.ConnectorOffsetBackingStore.getFromStore(ConnectorOffsetBackingStore.java:348)
> at
> org.apache.kafka.connect.storage.ConnectorOffsetBackingStore.get(ConnectorOffsetBackingStore.java:208)
> at
> org.apache.kafka.connect.storage.OffsetStorageReaderImpl.offsets(OffsetStorageReaderImpl.java:96)
> - locked <0x00000006e6ce0748> (a java.util.HashSet)
> at
> io.debezium.connector.common.OffsetReader.offsets(OffsetReader.java:42)
> at
> io.debezium.connector.common.BaseSourceTask.getPreviousOffsets(BaseSourceTask.java:365)
> at
> io.debezium.connector.postgresql.PostgresConnectorTask.start(PostgresConnectorTask.java:112)
> at
> io.debezium.connector.common.BaseSourceTask.startIfNeededAndPossible(BaseSourceTask.java:251)
> at
> io.debezium.connector.common.BaseSourceTask.poll(BaseSourceTask.java:178)
> at
> org.apache.kafka.connect.runtime.AbstractWorkerSourceTask.poll(AbstractWorkerSourceTask.java:469)
> at
> org.apache.kafka.connect.runtime.AbstractWorkerSourceTask.execute(AbstractWorkerSourceTask.java:357)
> at
> org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:204)
> at
> org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:259)
> at
> org.apache.kafka.connect.runtime.AbstractWorkerSourceTask.run(AbstractWorkerSourceTask.java:77)
> at
> org.apache.kafka.connect.runtime.isolation.Plugins.lambda$withClassLoader$1(Plugins.java:236)
> at
> org.apache.kafka.connect.runtime.isolation.Plugins$$Lambda$990/0x00007f3d01814e08.run(Unknown
> Source)
> at
> java.util.concurrent.Executors$RunnableAdapter.call([email protected]/Executors.java:539)
> at
> java.util.concurrent.FutureTask.run([email protected]/FutureTask.java:264)
> at
> java.util.concurrent.ThreadPoolExecutor.runWorker([email protected]/ThreadPoolExecutor.java:1136)
> at
> java.util.concurrent.ThreadPoolExecutor$Worker.run([email protected]/ThreadPoolExecutor.java:635)
> at java.lang.Thread.run([email protected]/Thread.java:840){noformat}
>
>
--
This message was sent by Atlassian Jira
(v8.20.10#820010)