[ 
https://issues.apache.org/jira/browse/KAFKA-7143?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17737709#comment-17737709
 ] 

Kirk True commented on KAFKA-7143:
----------------------------------

cc [~pnee] [~lianetm] 

> Cannot use KafkaConsumer with Kotlin coroutines due to various issues
> ---------------------------------------------------------------------
>
>                 Key: KAFKA-7143
>                 URL: https://issues.apache.org/jira/browse/KAFKA-7143
>             Project: Kafka
>          Issue Type: Improvement
>          Components: clients
>    Affects Versions: 1.1.0
>            Reporter: Raman Gupta
>            Priority: Major
>
> I am using a new KafkaConsumer via Kotlin. My application makes use of Kotlin 
> [coroutines|https://kotlinlang.org/docs/reference/coroutines.html], which 
> supports a style of async programming that avoids the need for callbacks (and 
> existing callback-based API's are usually easily be adapted to this style 
> with a simple wrapper). With coroutines, continuations are used instead: 
> methods with callbacks are suspended, and resumed once the call is complete. 
> With coroutines, while access to the KafkaConsumer is done in a thread-safe 
> way, it does NOT necessarily happen from a single thread -- a different 
> underlying thread may actually execute the code after the suspension point.
> However, the KafkaConsumer includes additional checks to verify not only the 
> thread safety of the client, but that the *same thread* is being used -- if 
> the same thread (by id) is not being used the consumer throws an exception 
> like:
> {code}
> Exception in thread "ForkJoinPool.commonPool-worker-25" 
> java.util.ConcurrentModificationException: KafkaConsumer is not safe for 
> multi-threaded access
>       at 
> org.apache.kafka.clients.consumer.KafkaConsumer.acquire(KafkaConsumer.java:1824)
>       at 
> org.apache.kafka.clients.consumer.KafkaConsumer.acquireAndEnsureOpen(KafkaConsumer.java:1808)
>       at 
> org.apache.kafka.clients.consumer.KafkaConsumer.commitAsync(KafkaConsumer.java:1321)
> {code}
> I understand this check is present to protect people from themselves, but I'd 
> like the ability to disable this check so that this code can be used 
> effectively by libraries such as Kotlin coroutines.
> There is a workaround for the above: run the consumer in a coroutine with a 
> single-thread context, which isn't ideal because it dedicates a thread to the 
> consumer.
> However, further problems await -- the `commitAsync` method also cannot be 
> used with coroutines because the callback is never executed and therefore the 
> coroutine is never resumed past the suspension point. Upon investigation, it 
> seems the callback is only executed after future calls to poll, which in a 
> regular polling loop with coroutines will never happen because of the 
> suspension on `commitAsync`, so we have a deadlock. I guess the idea behind 
> this Kafka consumer API design is that consuming new messages may continue, 
> even though commits of previous offsets (which happened an arbitrarily long 
> amount of time in the past) have not necessarily been processed. However, 
> with a coroutine based API, the commitAsync can be sequential before the next 
> poll like commitSync, but happen asynchronously without tying up a client 
> application thread.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to