I answered the similar questions on SO a while ago [1], and I hope it will help.
“By default, pipeline.apply(KafkaIO.read()...) will return a PCollection<KafkaRecord<K, V>>. So, downstream in your pipeline you can get an offset from KafkaRecord metadata and commit it manually in a way that you need (just don't forget to disable AUTO_COMMIT in KafkaIO.read()). By manual way, I mean that you should instantiate your own Kafka client in your DoFn, process input element (as KafkaRecord<K, V>), that was read before, fetch an offset from KafkaRecord and commit it with your own client. Though, you need to make sure that a call to external API and offset commit will be atomic to prevent potential data loss (if it's critical)." [1] https://stackoverflow.com/questions/69272461/how-to-manually-commit-kafka-offset-in-apache-beam-at-the-end-of-specific-dofun/69272880#69272880 <https://stackoverflow.com/questions/69272461/how-to-manually-commit-kafka-offset-in-apache-beam-at-the-end-of-specific-dofun/69272880#69272880> — Alexey > On 10 Dec 2021, at 10:40, Juan Calvo Ferrándiz <juancalvoferran...@gmail.com> > wrote: > > Thanks Luke for your quick response. I see, that makes sense. Now I have two > new questions if I may: > a) How I can get the offsets I want to commit. My investigation now is going > throw getCheckpointMark(), is this correct? > https://beam.apache.org/releases/javadoc/2.25.0/org/apache/beam/sdk/io/UnboundedSource.UnboundedReader.html#:~:text=has%20been%20called.-,getCheckpointMark,-public%20abstract%C2%A0UnboundedSource > > <https://beam.apache.org/releases/javadoc/2.25.0/org/apache/beam/sdk/io/UnboundedSource.UnboundedReader.html#:~:text=has%20been%20called.-,getCheckpointMark,-public%20abstract%C2%A0UnboundedSource> > > b) With these offsets, I will create a client at the of the pipeline, with > Kafka library, and methods such as commitSync() and commitAsync(). Is this > correct? > https://www.oreilly.com/library/view/kafka-the-definitive/9781491936153/ch04.html#:~:text=log%20an%20error.-,Asynchronous%20Commit,-One%20drawback%20of > > <https://www.oreilly.com/library/view/kafka-the-definitive/9781491936153/ch04.html#:~:text=log%20an%20error.-,Asynchronous%20Commit,-One%20drawback%20of> > > Thanks!!! > > Juan > > > On Fri, 10 Dec 2021 at 01:07, Luke Cwik <lc...@google.com > <mailto:lc...@google.com>> wrote: > commitOffsetsInFinalize is about committing the offset after the output has > been durably persisted for the bundle containing the Kafka Read. The bundle > represents a unit of work over a subgraph of the pipeline. You will want to > ensure the commitOffsetsInFinalize is disabled and that the Kafka consumer > config doesn't auto commit automatically. This will ensure that KafkaIO.Read > doesn't commit the offsets. Then it is upto your PTransform to perform the > committing. > > On Thu, Dec 9, 2021 at 3:36 PM Juan Calvo Ferrándiz > <juancalvoferran...@gmail.com <mailto:juancalvoferran...@gmail.com>> wrote: > Morning! > > First of all, thanks for all the incredible work you do, is amazing. Then, > secondly, I reach you for some help or guidance to manually commit records. I > want to do this so I can commit the record and the end of the pipeline, and > not in the read() of the KafkaIO. > > Bearing in mind what I have read in this post: > https://lists.apache.org/list?user@beam.apache.org:2021-9:user@beam.apache.org%20kafka%20commit > > <https://lists.apache.org/list?user@beam.apache.org:2021-9:user@beam.apache.org%20kafka%20commit> > , and thinking of a pipeline similar to the one described, I understand we > can use commitOffsetsInFinalize() to commit offsets in the read(). What I > don't understand is how this helps to commit the offset if we want to do this > at the end, not in the reading. Thanks. All comments and suggestions are > more than welcome. :) > > Juan Calvo Ferrándiz > Data Engineer > Go to LINKEDIN <https://www.linkedin.com/in/juan-calvo-ferrandiz/> > Go to GITHUB <https://github.com/juancalvof> > Go to MEDIUM <https://medium.com/@juancalvoferrandiz> >