Raman Gupta created KAFKA-7143:
----------------------------------
Summary: Cannot use KafkaConsumer with Kotlin coroutines due to
Thread id check
Key: KAFKA-7143
URL: https://issues.apache.org/jira/browse/KAFKA-7143
Project: Kafka
Issue Type: Improvement
Components: clients
Affects Versions: 1.1.0
Reporter: Raman Gupta
I am using a new KafkaConsumer via Kotlin. My application makes use of Kotlin
[coroutines|https://kotlinlang.org/docs/reference/coroutines.html], which
supports a style of async programming that avoids the need for callbacks (and
existing callback-based API's such as Kafka's can easily be adapted to this).
With coroutines, methods with callbacks are suspended, and resumed once the
call is complete. With this approach, while access to the KafkaConsumer is done
in a thread-safe way, it does NOT happen from a single thread -- a different
underlying thread may actually execute the code after the suspension point.
However, the KafkaConsumer includes additional checks to verify not only the
thread safety of the client, but that the *same thread* is being used -- if the
same thread (by id) is not being used the consumer throws an exception like:
{code}
Exception in thread "ForkJoinPool.commonPool-worker-25"
java.util.ConcurrentModificationException: KafkaConsumer is not safe for
multi-threaded access
at
org.apache.kafka.clients.consumer.KafkaConsumer.acquire(KafkaConsumer.java:1824)
at
org.apache.kafka.clients.consumer.KafkaConsumer.acquireAndEnsureOpen(KafkaConsumer.java:1808)
at
org.apache.kafka.clients.consumer.KafkaConsumer.commitAsync(KafkaConsumer.java:1321)
{code}
I understand this check is present to protect people from themselves, but I'd
like the ability to disable this check so that this code can be used
effectively by libraries such as Kotlin coroutines.
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)