Hello ! I am working on an exactly once stream processors in Python, using aiokafka client library. My program stores a state in memory, that is recovered from a changelog topic, like in kafka streams.
On each processing loop, I am consuming messages, producing messages to an output topics and to my changelog topic, within a transaction. When I need to restart a runner, to restore the state in memory, I have a routine consuming the changelog topic from the beginning to the "end" with a read_commited isolation level. Here I am struggling to define when to stop my recovery : * my current (maybe) working solution is to loop over "poll" until poll is not returning any messages anymore * I tried to do more something based on the end offests, the checking the consumer position, but with control messages at the end of the partition, I am running into an issue where position is one below end offsets, and doesn't go further I had a quick look to https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java but it is a bit hard to figure out what is going on here Best regards, Vincent