Well, `kafka-consumer-group.sh` can only display the difference between "committed offset" and "end offset". It cannot know what the "right" offset to be committed is. It's really the responsibility of the consumers to commit correctly.

-Matthias

On 7/27/23 1:03 AM, Vincent Maurin wrote:
Thank you Matthias for your answer, I open an issue on the aiokafka project as follow up, let's see how we can resolve it there https://github.com/aio-libs/aiokafka/issues/911

As mentioned in the issue, some tools like kafka-consumer-groups.sh also display a lag of "1" in this kind of situation

Best regards,

Vincent

On 13/06/2023 17:27, Matthias J. Sax wrote:
Sounds like a bug in aiokafka library to me.

If the last message in a topic partition is a tx-marker, the consumer should step over it, and report the correct position after the marker.

The official KafkaConsumer (ie, the Java one), does the exact same thing.


-Matthias

On 5/30/23 8:41 AM, Vincent Maurin wrote:
Hello !

I am working on an exactly once stream processors in Python, using
aiokafka client library. My program stores a state in memory, that is
recovered from a changelog topic, like in kafka streams.

On each processing loop, I am consuming messages, producing messages
to an output topics and to my changelog topic, within a transaction.

When I need to restart a runner, to restore the state in memory, I
have a routine consuming the changelog topic from the beginning to the
"end" with a read_commited isolation level. Here I am struggling to
define when to stop my recovery :
* my current (maybe) working solution is to loop over "poll" until
poll is not returning any messages anymore
* I tried to do more something based on the end offests, the checking
the consumer position, but with control messages at the end of the
partition, I am running into an issue where position is one below end
offsets, and doesn't go further

I had a quick look to
https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java
but it is a bit hard to figure out what is going on here

Best regards,
Vincent

Reply via email to