[
https://issues.apache.org/jira/browse/KAFKA-17871?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17892627#comment-17892627
]
Greg Harris commented on KAFKA-17871:
-------------------------------------
The herder thread should avoid blocking on locks that could be held by the task
indefinitely.
Either the implementation of OffsetStorageReaderImpl should reduce it's
critical sections and hold the lock less, or the cancellation can be changed to
not require the lock, or the cancellation could interrupt the blocked operation.
> Source task source offset reads can block herder task cancellation
> ------------------------------------------------------------------
>
> Key: KAFKA-17871
> URL: https://issues.apache.org/jira/browse/KAFKA-17871
> Project: Kafka
> Issue Type: Bug
> Components: connect
> Affects Versions: 2.5.0
> Reporter: Greg Harris
> Priority: Major
>
> In KAFKA-9051, source task offsets reading was modified to allow for
> in-progress read Futures to be cancelled during task shutdown. The
> OffsetReaderStorageImpl#offsetReadFutures uses explicitly synchronized
> accesses to prevent data races between task cancellation and connectors
> reading offsets.
> A thread executing OffsetReaderStorageImpl#offsets method can lock the Set,
> and then call Producer#flush inside KafkaBasedLog#flush.
> At the same time, the herder thread may try to shut down the task, time out,
> and call AbstractWorkerSourceTask#cancel. This cancellation attempts to lock
> the Set again, and must wait for the Producer#flush to complete. If the
> task's producer is unhealthy, this can block the herder thread indefinitely.
> See the following stacktraces:
> {noformat}
> java.lang.Thread.State: BLOCKED (on object monitor)
> at
> org.apache.kafka.connect.storage.OffsetStorageReaderImpl.close(OffsetStorageReaderImpl.java:148)
> - waiting to lock <0x00000006e6ce0748> (a java.util.HashSet)
> at
> org.apache.kafka.connect.runtime.AbstractWorkerSourceTask.cancel(AbstractWorkerSourceTask.java:288)
> at
> org.apache.kafka.connect.runtime.Worker.awaitStopTask(Worker.java:1036)
> at
> org.apache.kafka.connect.runtime.Worker.awaitStopTasks(Worker.java:1054)
> at
> org.apache.kafka.connect.runtime.Worker.stopAndAwaitTask(Worker.java:1082)
> at
> org.apache.kafka.connect.runtime.distributed.DistributedHerder.lambda$restartTask$23(DistributedHerder.java:1369)
> at
> org.apache.kafka.connect.runtime.distributed.DistributedHerder$$Lambda$1647/0x00007f3d01941b28.call(Unknown
> Source)
> at
> org.apache.kafka.connect.runtime.distributed.DistributedHerder.runRequest(DistributedHerder.java:2240)
> at
> org.apache.kafka.connect.runtime.distributed.DistributedHerder.tick(DistributedHerder.java:470)
> at
> org.apache.kafka.connect.runtime.distributed.DistributedHerder.run(DistributedHerder.java:371)
> at
> java.util.concurrent.Executors$RunnableAdapter.call([email protected]/Executors.java:539)
> at
> java.util.concurrent.FutureTask.run([email protected]/FutureTask.java:264)
> at
> java.util.concurrent.ThreadPoolExecutor.runWorker([email protected]/ThreadPoolExecutor.java:1136)
> at
> java.util.concurrent.ThreadPoolExecutor$Worker.run([email protected]/ThreadPoolExecutor.java:635)
> at java.lang.Thread.run([email protected]/Thread.java:840){noformat}
>
> {noformat}
> java.lang.Thread.State: WAITING (parking)
> at jdk.internal.misc.Unsafe.park([email protected]/Native Method)
> - parking to wait for <0x00000006e4f9d610> (a
> java.util.concurrent.CountDownLatch$Sync)
> at
> java.util.concurrent.locks.LockSupport.park([email protected]/LockSupport.java:211)
> at
> java.util.concurrent.locks.AbstractQueuedSynchronizer.acquire([email protected]/AbstractQueuedSynchronizer.java:715)
> at
> java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireSharedInterruptibly([email protected]/AbstractQueuedSynchronizer.java:1047)
> at
> java.util.concurrent.CountDownLatch.await([email protected]/CountDownLatch.java:230)
> at
> org.apache.kafka.clients.producer.internals.ProduceRequestResult.await(ProduceRequestResult.java:76)
> at
> org.apache.kafka.clients.producer.internals.RecordAccumulator.awaitFlushCompletion(RecordAccumulator.java:1075)
> at
> org.apache.kafka.clients.producer.KafkaProducer.flush(KafkaProducer.java:1215)
> at
> org.apache.kafka.connect.util.KafkaBasedLog$$Lambda$861/0x00007f3d017dc9d8.accept(Unknown
> Source)
> at java.util.Optional.ifPresent([email protected]/Optional.java:178)
> at
> org.apache.kafka.connect.util.KafkaBasedLog.flush(KafkaBasedLog.java:345)
> at
> org.apache.kafka.connect.util.KafkaBasedLog.readToEnd(KafkaBasedLog.java:334)
> at
> org.apache.kafka.connect.storage.KafkaOffsetBackingStore.get(KafkaOffsetBackingStore.java:295)
> at
> org.apache.kafka.connect.storage.ConnectorOffsetBackingStore.lambda$getFromStore$5(ConnectorOffsetBackingStore.java:348)
> at
> org.apache.kafka.connect.storage.ConnectorOffsetBackingStore$$Lambda$1277/0x00007f3d0184d600.apply(Unknown
> Source)
> at java.util.Optional.map([email protected]/Optional.java:260)
> at
> org.apache.kafka.connect.storage.ConnectorOffsetBackingStore.getFromStore(ConnectorOffsetBackingStore.java:348)
> at
> org.apache.kafka.connect.storage.ConnectorOffsetBackingStore.get(ConnectorOffsetBackingStore.java:208)
> at
> org.apache.kafka.connect.storage.OffsetStorageReaderImpl.offsets(OffsetStorageReaderImpl.java:96)
> - locked <0x00000006e6ce0748> (a java.util.HashSet)
> at
> io.debezium.connector.common.OffsetReader.offsets(OffsetReader.java:42)
> at
> io.debezium.connector.common.BaseSourceTask.getPreviousOffsets(BaseSourceTask.java:365)
> at
> io.debezium.connector.postgresql.PostgresConnectorTask.start(PostgresConnectorTask.java:112)
> at
> io.debezium.connector.common.BaseSourceTask.startIfNeededAndPossible(BaseSourceTask.java:251)
> at
> io.debezium.connector.common.BaseSourceTask.poll(BaseSourceTask.java:178)
> at
> org.apache.kafka.connect.runtime.AbstractWorkerSourceTask.poll(AbstractWorkerSourceTask.java:469)
> at
> org.apache.kafka.connect.runtime.AbstractWorkerSourceTask.execute(AbstractWorkerSourceTask.java:357)
> at
> org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:204)
> at
> org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:259)
> at
> org.apache.kafka.connect.runtime.AbstractWorkerSourceTask.run(AbstractWorkerSourceTask.java:77)
> at
> org.apache.kafka.connect.runtime.isolation.Plugins.lambda$withClassLoader$1(Plugins.java:236)
> at
> org.apache.kafka.connect.runtime.isolation.Plugins$$Lambda$990/0x00007f3d01814e08.run(Unknown
> Source)
> at
> java.util.concurrent.Executors$RunnableAdapter.call([email protected]/Executors.java:539)
> at
> java.util.concurrent.FutureTask.run([email protected]/FutureTask.java:264)
> at
> java.util.concurrent.ThreadPoolExecutor.runWorker([email protected]/ThreadPoolExecutor.java:1136)
> at
> java.util.concurrent.ThreadPoolExecutor$Worker.run([email protected]/ThreadPoolExecutor.java:635)
> at java.lang.Thread.run([email protected]/Thread.java:840){noformat}
--
This message was sent by Atlassian Jira
(v8.20.10#820010)