Hello guys

I'm struggling to understand how the KafkaIO reader works regarding the
initial offset. I enabled *commitOffsetsInFinalize* which I understand will
commit the offset to Kafka after finishing each checkpoint.


My question is. If I'm using a stable *group.id <http://group.id>* and for
some reason I have to restart my state (currently due some errors I'm
seeing when I update my app code)  and start the app from scratch. It will
honor the latest offset committed to the Kafka cluster? (I'm setting
auto.commit = false and auto.offset.reset  = latest)

Reading the code here:
https://github.com/apache/beam/blob/de4645d45073004b3b7d196de7ddf40ad6429eb0/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaUnboundedReader.java#L679
<https://github.com/apache/beam/blob/de4645d45073004b3b7d196de7ddf40ad6429eb0/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaUnboundedReader.java#L679>

it seems like it's getting the position (not the committed) which is
certainly different
https://stackoverflow.com/questions/47543771/kafkaconsumer-position-vs-committed


And I see that commitSync is called here:
https://github.com/apache/beam/blob/de4645d45073004b3b7d196de7ddf40ad6429eb0/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaUnboundedReader.java#L600

So I don't really understand where the resume from committed offset will
happen, but maybe I'm misunderstanding something.

Regards.

Reply via email to