I answered the similar questions on SO a while ago [1], and I hope it will help.

“By default, pipeline.apply(KafkaIO.read()...) will return a 
PCollection<KafkaRecord<K, V>>. So, downstream in your pipeline you can get an 
offset from KafkaRecord metadata and commit it manually in a way that you need 
(just don't forget to disable AUTO_COMMIT in KafkaIO.read()).

By manual way, I mean that you should instantiate your own Kafka client in your 
DoFn, process input element (as KafkaRecord<K, V>), that was read before, fetch 
an offset from KafkaRecord and commit it with your own client. 

Though, you need to make sure that a call to external API and offset commit 
will be atomic to prevent potential data loss (if it's critical)."

[1] 
https://stackoverflow.com/questions/69272461/how-to-manually-commit-kafka-offset-in-apache-beam-at-the-end-of-specific-dofun/69272880#69272880
 
<https://stackoverflow.com/questions/69272461/how-to-manually-commit-kafka-offset-in-apache-beam-at-the-end-of-specific-dofun/69272880#69272880>

—
Alexey

> On 10 Dec 2021, at 10:40, Juan Calvo Ferrándiz <juancalvoferran...@gmail.com> 
> wrote:
> 
> Thanks Luke for your quick response. I see, that makes sense. Now I have two 
> new questions if I may: 
> a) How I can get the offsets I want to commit. My investigation now is going 
> throw getCheckpointMark(), is this correct? 
> https://beam.apache.org/releases/javadoc/2.25.0/org/apache/beam/sdk/io/UnboundedSource.UnboundedReader.html#:~:text=has%20been%20called.-,getCheckpointMark,-public%20abstract%C2%A0UnboundedSource
>  
> <https://beam.apache.org/releases/javadoc/2.25.0/org/apache/beam/sdk/io/UnboundedSource.UnboundedReader.html#:~:text=has%20been%20called.-,getCheckpointMark,-public%20abstract%C2%A0UnboundedSource>
> 
> b) With these offsets, I will create a client at the of the pipeline, with 
> Kafka library, and methods such as commitSync() and commitAsync(). Is this 
> correct? 
> https://www.oreilly.com/library/view/kafka-the-definitive/9781491936153/ch04.html#:~:text=log%20an%20error.-,Asynchronous%20Commit,-One%20drawback%20of
>  
> <https://www.oreilly.com/library/view/kafka-the-definitive/9781491936153/ch04.html#:~:text=log%20an%20error.-,Asynchronous%20Commit,-One%20drawback%20of>
> 
> Thanks!!!
> 
> Juan 
> 
> 
> On Fri, 10 Dec 2021 at 01:07, Luke Cwik <lc...@google.com 
> <mailto:lc...@google.com>> wrote:
> commitOffsetsInFinalize is about committing the offset after the output has 
> been durably persisted for the bundle containing the Kafka Read. The bundle 
> represents a unit of work over a subgraph of the pipeline. You will want to 
> ensure the commitOffsetsInFinalize is disabled and that the Kafka consumer 
> config doesn't auto commit automatically. This will ensure that KafkaIO.Read 
> doesn't commit the offsets. Then it is upto your PTransform to perform the 
> committing.
> 
> On Thu, Dec 9, 2021 at 3:36 PM Juan Calvo Ferrándiz 
> <juancalvoferran...@gmail.com <mailto:juancalvoferran...@gmail.com>> wrote:
> Morning!
> 
> First of all, thanks for all the incredible work you do, is amazing. Then, 
> secondly, I reach you for some help or guidance to manually commit records. I 
> want to do this so I can commit the record and the end of the pipeline, and 
> not in the read() of the KafkaIO.
> 
> Bearing in mind what I have read in this post: 
> https://lists.apache.org/list?user@beam.apache.org:2021-9:user@beam.apache.org%20kafka%20commit
>  
> <https://lists.apache.org/list?user@beam.apache.org:2021-9:user@beam.apache.org%20kafka%20commit>
>  , and thinking of a pipeline similar to the one described, I understand we 
> can use commitOffsetsInFinalize() to commit offsets in the read(). What I 
> don't understand is how this helps to commit the offset if we want to do this 
> at the end, not in the reading.     Thanks. All comments and suggestions are 
> more than welcome. :) 
> 
> Juan Calvo Ferrándiz
> Data Engineer
> Go to LINKEDIN  <https://www.linkedin.com/in/juan-calvo-ferrandiz/>
> Go to GITHUB <https://github.com/juancalvof>
> Go to MEDIUM <https://medium.com/@juancalvoferrandiz> 
> 

Reply via email to