If you are using an exactly-once runner, it will guarantee every message is
consumed once (though the mechanism might not be obvious).

Generally what happens is that the messages are consumed into the system in
order. However if you have downstream ParDos, there is no guarantee that
they process the messages in the same order (especially if there is a
shuffle operation, such as GroupByKey, in between).

Now a future version of the source might decide to split the Kafka
partition if it's too large to handle on one thread (e.g. split it in half
where the first half is bounded and the second half is the growing
unbounded tail of the partition). In this case the source would keep two
checkpoints for the current position in each half of the partition. (this
mode of operation probably wouldn't be compatible with checkpointing
offsets back to the broker though.). The source doesn't do this today, I'm
just mentioning it to point out another way in which things could be
consumed out of order.

On Sun, Sep 25, 2022 at 11:40 AM Yomal de Silva <yomal.prav...@gmail.com>
wrote:

> Hi Reuven,
> Thanks for those clarifications.
>
> For the 4th question that I raised, if A gets failed and B is committed,
> will those messages(A) get consumed again from Kafka or will the messages
> get recovered from the checkpoint and retried in that specific operator?
>
> On Sun, Sep 25, 2022 at 10:45 PM Reuven Lax via user <user@beam.apache.org>
> wrote:
>
>>
>>
>> On Sun, Sep 25, 2022 at 4:56 AM Yomal de Silva <yomal.prav...@gmail.com>
>> wrote:
>>
>>> Hi all,
>>>
>>> I have started using KafkaIO to read a data stream and have the
>>> following questions. Appreciate it if you could provide a few
>>> clarifications on the following.
>>>
>>>
>> 1. Does KafkaIO ignore the offset stored in the broker and uses the
>>> offset stored during checkpointing when consuming messages?
>>>
>>
>> Generally yes, as that's the only way to guarantee consistency (we can't
>> atomically commit to the runner and to Kafka). However when starting a new
>> pipeline, you should be able to start reading at the broker checkpoint.
>>
>>
>>> 2. How many threads will be used by the Kafka consumer?
>>>
>>
>> This depends somewhat on the runner, but you can expect one thread per
>> partition.
>>
>>
>>> 3. If the consumer polls a set of messages A and then later B while A is
>>> still being processed, is there a possibility of set B finishing before A?
>>> Does parallelism control this?
>>>
>>
>> yes. Beam doesn't currently have any notion of ordering. All messages are
>> independent and can be processed at different times (the source also
>> reserves the right to process different ranges of a single Kafka partition
>> on different threads, though it doesn't currently do this).
>>
>>
>>> 4. In the above scenario if B is committed back to the broker and
>>> somehow A failed, upon a restart is there any way we can consume A again
>>> without losing data?
>>>
>>
>> Data should never be lost. If B is processed, then you can assume that
>> the A data is checkpointed inside the Beam runner and will be processed to.
>>
>>
>>
>>>
>>> Thank you.
>>>
>>>
>>>
>>

Reply via email to