Super! Thanks for all this info. Testing to do windowing on a per-key
basis, to have a consumer per topic/partition/schema. This way, the bundle
of data, from a specific window time, seems to wait until the previous one
has been processed, independently of the number of records.
*Juan*
On Mon,
What I mean is, if you want to only commit offsets *after* a
KafkaRecord is processed, then you need to keep parallelism to the
number of partitions, as offsets are monotonically increasing *per
partition*. So if you only have one partition and then split into two
'threads', if T1 handling offsets
If you want to ensure you have at least once processing I think the
*maximum* amount of parallelization you can have would be the number of
partitions you have, so you'd want to group by partition, process a bundle
of that partition, then commit the last offset for a given partition.
*~Vincent*
Thanks Alexey! I understand. Continue thinking in possible solutions of
committing records, I was thinking about what happens in this scenario:
When processing windows of data, do they get processed in sequential order
or is it possible for them to be processed out of order? For example Window
1 c
I answered the similar questions on SO a while ago [1], and I hope it will help.
“By default, pipeline.apply(KafkaIO.read()...) will return a
PCollection>. So, downstream in your pipeline you can get an
offset from KafkaRecord metadata and commit it manually in a way that you need
(just don't f
Also, I was thinking if we could end up with some kind of race
conditioning:
bundle 1 contains:
Messages [1,2,3,4,5]
bundle 2 contains:
Messages: [6,7]If batch 2 completes before bundle 1 then it will commit all
messages up to commit 7. If bundle 1 fails for whatever reason we
potentially lose tha
Thanks Luke for your quick response. I see, that makes sense. Now I have
two new questions if I may:
a) How I can get the offsets I want to commit. My investigation now is
going throw getCheckpointMark(), is this correct?
https://beam.apache.org/releases/javadoc/2.25.0/org/apache/beam/sdk/io/Unboun
Morning!
First of all, thanks for all the incredible work you do, is amazing. Then,
secondly, I reach you for some help or guidance to manually commit records.
I want to do this so I can commit the record and the end of the pipeline,
and not in the read() of the KafkaIO.
Bearing in mind what I ha