Hello guys I'm struggling to understand how the KafkaIO reader works regarding the initial offset. I enabled *commitOffsetsInFinalize* which I understand will commit the offset to Kafka after finishing each checkpoint.
My question is. If I'm using a stable *group.id <http://group.id>* and for some reason I have to restart my state (currently due some errors I'm seeing when I update my app code) and start the app from scratch. It will honor the latest offset committed to the Kafka cluster? (I'm setting auto.commit = false and auto.offset.reset = latest) Reading the code here: https://github.com/apache/beam/blob/de4645d45073004b3b7d196de7ddf40ad6429eb0/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaUnboundedReader.java#L679 <https://github.com/apache/beam/blob/de4645d45073004b3b7d196de7ddf40ad6429eb0/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaUnboundedReader.java#L679> it seems like it's getting the position (not the committed) which is certainly different https://stackoverflow.com/questions/47543771/kafkaconsumer-position-vs-committed And I see that commitSync is called here: https://github.com/apache/beam/blob/de4645d45073004b3b7d196de7ddf40ad6429eb0/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaUnboundedReader.java#L600 So I don't really understand where the resume from committed offset will happen, but maybe I'm misunderstanding something. Regards.
