Hi,
I am trying to do async commits from the rebalance listener, to be
precise, in method onPartitionsRevoked. The idea is to wait until all
commits for the current epoch are done before the rebalance barrier and
by doing so prevent duplicate processing.
It is not so hard to call commitAsync(offsets, callback), but what
method should be used so that the Kafka client gets a chance to call the
callback?
I tried the following:
*1. Call **commitAsync(Collections.emptyMap, callback)**
*
Unfortunately when you call commitAsync with an empty offsets map, it
doesn't call the callbacks of previous commits.
There is a PR from 2020 that would fix this issue:
https://github.com/apache/kafka/pull/9111. This PR was closed without
merging. Should this PR be reconsidered?
*2. Pause all partitions and call **poll(0)*
Doesn't work; you'll get a "KafkaConsumer is not safe for multi-threaded
access" exception.
*3. Call commitSync(**Collections.emptyMap, callback)**
*
Behaves the same as under point 1.
*4. Repeated calls to **commitAsync(offsets, callback)**
*
This time we keep calling commitAsync with the same offsets until these
offsets are committed. Unfortunately, this never ends. Either because
commitAsync doesn't call the callbacks, or because this just stacks up
more commits to complete.
I looked at the other methods on the consumer API but I didn't find
anything that looked suitable for this use case.
So to repeat the question:
What method should I invoke (from the onPartitionsRevoked callback), to
make the Kafka client invoke the callback of earlier async commits?
Some context: I am working on zio-kafka; a completely async library that
provides a concurrent streaming layer on top of the Java client.
Thanks,
Erik.
--
Erik van Oosten
e.vanoos...@grons.nl
https://day-to-day-stuff.blogspot.com