Hi,

I am trying to do async commits from the rebalance listener, to be precise, in method onPartitionsRevoked. The idea is to wait until all commits for the current epoch are done before the rebalance barrier and by doing so prevent duplicate processing.

It is not so hard to call commitAsync(offsets, callback), but what method should be used so that the Kafka client gets a chance to call the callback?

I tried the following:

*1. Call **commitAsync(Collections.emptyMap, callback)**
*

Unfortunately when you call commitAsync with an empty offsets map, it doesn't call the callbacks of previous commits.

There is a PR from 2020 that would fix this issue: https://github.com/apache/kafka/pull/9111. This PR was closed without merging. Should this PR be reconsidered?

*2. Pause all partitions and call **poll(0)*

Doesn't work; you'll get a "KafkaConsumer is not safe for multi-threaded access" exception.

*3. Call commitSync(**Collections.emptyMap, callback)**
*

Behaves the same as under point 1.

*4. Repeated calls to **commitAsync(offsets, callback)**
*

This time we keep calling commitAsync with the same offsets until these offsets are committed. Unfortunately, this never ends. Either because commitAsync doesn't call the callbacks, or because this just stacks up more commits to complete.

I looked at the other methods on the consumer API but I didn't find anything that looked suitable for this use case.


So to repeat the question:

What method should I invoke (from the onPartitionsRevoked callback), to make the Kafka client invoke the callback of earlier async commits?


Some context: I am working on zio-kafka; a completely async library that provides a concurrent streaming layer on top of the Java client.

Thanks,
    Erik.


--
Erik van Oosten
e.vanoos...@grons.nl
https://day-to-day-stuff.blogspot.com

Reply via email to