[ https://issues.apache.org/jira/browse/KAFKA-7143?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17737709#comment-17737709 ]
Kirk True commented on KAFKA-7143: ---------------------------------- cc [~pnee] [~lianetm] > Cannot use KafkaConsumer with Kotlin coroutines due to various issues > --------------------------------------------------------------------- > > Key: KAFKA-7143 > URL: https://issues.apache.org/jira/browse/KAFKA-7143 > Project: Kafka > Issue Type: Improvement > Components: clients > Affects Versions: 1.1.0 > Reporter: Raman Gupta > Priority: Major > > I am using a new KafkaConsumer via Kotlin. My application makes use of Kotlin > [coroutines|https://kotlinlang.org/docs/reference/coroutines.html], which > supports a style of async programming that avoids the need for callbacks (and > existing callback-based API's are usually easily be adapted to this style > with a simple wrapper). With coroutines, continuations are used instead: > methods with callbacks are suspended, and resumed once the call is complete. > With coroutines, while access to the KafkaConsumer is done in a thread-safe > way, it does NOT necessarily happen from a single thread -- a different > underlying thread may actually execute the code after the suspension point. > However, the KafkaConsumer includes additional checks to verify not only the > thread safety of the client, but that the *same thread* is being used -- if > the same thread (by id) is not being used the consumer throws an exception > like: > {code} > Exception in thread "ForkJoinPool.commonPool-worker-25" > java.util.ConcurrentModificationException: KafkaConsumer is not safe for > multi-threaded access > at > org.apache.kafka.clients.consumer.KafkaConsumer.acquire(KafkaConsumer.java:1824) > at > org.apache.kafka.clients.consumer.KafkaConsumer.acquireAndEnsureOpen(KafkaConsumer.java:1808) > at > org.apache.kafka.clients.consumer.KafkaConsumer.commitAsync(KafkaConsumer.java:1321) > {code} > I understand this check is present to protect people from themselves, but I'd > like the ability to disable this check so that this code can be used > effectively by libraries such as Kotlin coroutines. > There is a workaround for the above: run the consumer in a coroutine with a > single-thread context, which isn't ideal because it dedicates a thread to the > consumer. > However, further problems await -- the `commitAsync` method also cannot be > used with coroutines because the callback is never executed and therefore the > coroutine is never resumed past the suspension point. Upon investigation, it > seems the callback is only executed after future calls to poll, which in a > regular polling loop with coroutines will never happen because of the > suspension on `commitAsync`, so we have a deadlock. I guess the idea behind > this Kafka consumer API design is that consuming new messages may continue, > even though commits of previous offsets (which happened an arbitrarily long > amount of time in the past) have not necessarily been processed. However, > with a coroutine based API, the commitAsync can be sequential before the next > poll like commitSync, but happen asynchronously without tying up a client > application thread. -- This message was sent by Atlassian Jira (v8.20.10#820010)