Re: Kafka manually commit offsets

2021-12-14 Thread Juan Calvo Ferrándiz
Super! Thanks for all this info. Testing to do windowing on a per-key basis, to have a consumer per topic/partition/schema. This way, the bundle of data, from a specific window time, seems to wait until the previous one has been processed, independently of the number of records. *Juan* On Mon,

Re: Kafka manually commit offsets

2021-12-13 Thread Vincent Marquez
What I mean is, if you want to only commit offsets *after* a KafkaRecord is processed, then you need to keep parallelism to the number of partitions, as offsets are monotonically increasing *per partition*. So if you only have one partition and then split into two 'threads', if T1 handling offsets

Re: Kafka manually commit offsets

2021-12-10 Thread Vincent Marquez
If you want to ensure you have at least once processing I think the *maximum* amount of parallelization you can have would be the number of partitions you have, so you'd want to group by partition, process a bundle of that partition, then commit the last offset for a given partition. *~Vincent*

Re: Kafka manually commit offsets

2021-12-10 Thread Juan Calvo Ferrándiz
Thanks Alexey! I understand. Continue thinking in possible solutions of committing records, I was thinking about what happens in this scenario: When processing windows of data, do they get processed in sequential order or is it possible for them to be processed out of order? For example Window 1 c

Re: Kafka manually commit offsets

2021-12-10 Thread Alexey Romanenko
I answered the similar questions on SO a while ago [1], and I hope it will help. “By default, pipeline.apply(KafkaIO.read()...) will return a PCollection>. So, downstream in your pipeline you can get an offset from KafkaRecord metadata and commit it manually in a way that you need (just don't f

Re: Kafka manually commit offsets

2021-12-10 Thread Juan Calvo Ferrándiz
Also, I was thinking if we could end up with some kind of race conditioning: bundle 1 contains: Messages [1,2,3,4,5] bundle 2 contains: Messages: [6,7]If batch 2 completes before bundle 1 then it will commit all messages up to commit 7. If bundle 1 fails for whatever reason we potentially lose tha

Re: Kafka manually commit offsets

2021-12-10 Thread Juan Calvo Ferrándiz
Thanks Luke for your quick response. I see, that makes sense. Now I have two new questions if I may: a) How I can get the offsets I want to commit. My investigation now is going throw getCheckpointMark(), is this correct? https://beam.apache.org/releases/javadoc/2.25.0/org/apache/beam/sdk/io/Unboun

Fwd: Kafka manually commit offsets

2021-12-09 Thread Juan Calvo Ferrándiz
Morning! First of all, thanks for all the incredible work you do, is amazing. Then, secondly, I reach you for some help or guidance to manually commit records. I want to do this so I can commit the record and the end of the pipeline, and not in the read() of the KafkaIO. Bearing in mind what I ha