If you want to ensure you have at least once processing I think the
*maximum* amount of parallelization you can have would be the number of
partitions you have, so you'd want to group by partition, process a bundle
of that partition, then commit the last offset for a given partition.

*~Vincent*


On Fri, Dec 10, 2021 at 9:28 AM Luke Cwik <lc...@google.com> wrote:

> Yes, you will need to deal with records being out of order because the
> system will process many things in parallel.
>
> You can read the last committed offset from Kafka and compare it against
> the offset you have right now. If the offset you have right is not the next
> offset you store it in state and if it is then you find the contiguous
> range of offsets that you have stored in state starting from this offset
> and remove them from state and commit the last one in that contiguous range.
>
> On Fri, Dec 10, 2021 at 8:18 AM Juan Calvo Ferrándiz <
> juancalvoferran...@gmail.com> wrote:
>
>>
>>
>> Thanks Alexey! I understand. Continue thinking in possible solutions of
>> committing records, I was thinking about what happens in this scenario:
>>
>> When processing windows of data, do they get processed in sequential
>> order or is it possible for them to be processed out of order? For example
>> Window 1 contains 10000 elements of data whereas window 2 contains 10
>> elements. Assuming Window 1 takes a while to process all of that data, is
>> it possible window 2 will finish before window 1?
>>
>> Thanks again!
>>
>> On Fri, 10 Dec 2021 at 14:39, Alexey Romanenko <aromanenko....@gmail.com>
>> wrote:
>>
>>> I answered the similar questions on SO a while ago [1], and I hope it
>>> will help.
>>>
>>> “By default, pipeline.apply(KafkaIO.read()...) will return
>>> a PCollection<KafkaRecord<K, V>>. So, downstream in your pipeline you can
>>> get an offset from KafkaRecord metadata and commit it manually in a way
>>> that you need (just don't forget to disable AUTO_COMMIT in KafkaIO.read()).
>>>
>>> By manual way, I mean that you should instantiate your own Kafka client
>>> in your DoFn, process input element (as KafkaRecord<K, V>), that was read
>>> before, fetch an offset from KafkaRecord and commit it with your own
>>> client.
>>>
>>> Though, you need to make sure that a call to external API and offset
>>> commit will be atomic to prevent potential data loss (if it's critical)."
>>>
>>> [1]
>>> https://stackoverflow.com/questions/69272461/how-to-manually-commit-kafka-offset-in-apache-beam-at-the-end-of-specific-dofun/69272880#69272880
>>>
>>> —
>>> Alexey
>>>
>>> On 10 Dec 2021, at 10:40, Juan Calvo Ferrándiz <
>>> juancalvoferran...@gmail.com> wrote:
>>>
>>> Thanks Luke for your quick response. I see, that makes sense. Now I have
>>> two new questions if I may:
>>> a) How I can get the offsets I want to commit. My investigation now is
>>> going throw getCheckpointMark(), is this correct?
>>> https://beam.apache.org/releases/javadoc/2.25.0/org/apache/beam/sdk/io/UnboundedSource.UnboundedReader.html#:~:text=has%20been%20called.-,getCheckpointMark,-public%20abstract%C2%A0UnboundedSource
>>>
>>> b) With these offsets, I will create a client at the of the pipeline,
>>> with Kafka library, and methods such as commitSync() and commitAsync(). Is
>>> this correct?
>>> https://www.oreilly.com/library/view/kafka-the-definitive/9781491936153/ch04.html#:~:text=log%20an%20error.-,Asynchronous%20Commit,-One%20drawback%20of
>>>
>>> Thanks!!!
>>>
>>> *Juan *
>>>
>>>
>>> On Fri, 10 Dec 2021 at 01:07, Luke Cwik <lc...@google.com> wrote:
>>>
>>>> commitOffsetsInFinalize is about committing the offset after the output
>>>> has been durably persisted for the bundle containing the Kafka Read. The
>>>> bundle represents a unit of work over a subgraph of the pipeline. You will
>>>> want to ensure the commitOffsetsInFinalize is disabled and that the Kafka
>>>> consumer config doesn't auto commit automatically. This will ensure that
>>>> KafkaIO.Read doesn't commit the offsets. Then it is upto your PTransform to
>>>> perform the committing.
>>>>
>>>> On Thu, Dec 9, 2021 at 3:36 PM Juan Calvo Ferrándiz <
>>>> juancalvoferran...@gmail.com> wrote:
>>>>
>>>>> Morning!
>>>>>
>>>>> First of all, thanks for all the incredible work you do, is amazing.
>>>>> Then, secondly, I reach you for some help or guidance to manually commit
>>>>> records. I want to do this so I can commit the record and the end of the
>>>>> pipeline, and not in the read() of the KafkaIO.
>>>>>
>>>>> Bearing in mind what I have read in this post:
>>>>> https://lists.apache.org/list?user@beam.apache.org:2021-9:user@beam.apache.org%20kafka%20commit
>>>>> , and thinking of a pipeline similar to the one described, I understand we
>>>>> can use commitOffsetsInFinalize() to commit offsets in the read().
>>>>> What I don't understand is how this helps to commit the offset if we want
>>>>> to do this at the end, not in the reading.    Thanks. All comments and
>>>>> suggestions are more than welcome. :)
>>>>>
>>>>>
>>>>> *Juan *
>>>>>
>>>>>
>>>>>
>>>

Reply via email to