But still, if we have a new deployment rolled out in which we can't recover
the state from the previous snapshot/savepoint there is a possibility of a
data loss here right? This is considering if we modify the existing
operators or add/delete the operators in such a way that the operator
states cannot get recovered from the snapshot. I think this is a very valid
scenario when rolling out new features into the pipeline.

Any thoughts on this? In such a case what would be the best practice to
recover those records?

On Mon, Sep 26, 2022 at 12:55 AM Reuven Lax <re...@google.com> wrote:

> If you are using an exactly-once runner, it will guarantee every message
> is consumed once (though the mechanism might not be obvious).
>
> Generally what happens is that the messages are consumed into the system
> in order. However if you have downstream ParDos, there is no guarantee that
> they process the messages in the same order (especially if there is a
> shuffle operation, such as GroupByKey, in between).
>
> Now a future version of the source might decide to split the Kafka
> partition if it's too large to handle on one thread (e.g. split it in half
> where the first half is bounded and the second half is the growing
> unbounded tail of the partition). In this case the source would keep two
> checkpoints for the current position in each half of the partition. (this
> mode of operation probably wouldn't be compatible with checkpointing
> offsets back to the broker though.). The source doesn't do this today, I'm
> just mentioning it to point out another way in which things could be
> consumed out of order.
>
> On Sun, Sep 25, 2022 at 11:40 AM Yomal de Silva <yomal.prav...@gmail.com>
> wrote:
>
>> Hi Reuven,
>> Thanks for those clarifications.
>>
>> For the 4th question that I raised, if A gets failed and B is committed,
>> will those messages(A) get consumed again from Kafka or will the messages
>> get recovered from the checkpoint and retried in that specific operator?
>>
>> On Sun, Sep 25, 2022 at 10:45 PM Reuven Lax via user <
>> user@beam.apache.org> wrote:
>>
>>>
>>>
>>> On Sun, Sep 25, 2022 at 4:56 AM Yomal de Silva <yomal.prav...@gmail.com>
>>> wrote:
>>>
>>>> Hi all,
>>>>
>>>> I have started using KafkaIO to read a data stream and have the
>>>> following questions. Appreciate it if you could provide a few
>>>> clarifications on the following.
>>>>
>>>>
>>> 1. Does KafkaIO ignore the offset stored in the broker and uses the
>>>> offset stored during checkpointing when consuming messages?
>>>>
>>>
>>> Generally yes, as that's the only way to guarantee consistency (we can't
>>> atomically commit to the runner and to Kafka). However when starting a new
>>> pipeline, you should be able to start reading at the broker checkpoint.
>>>
>>>
>>>> 2. How many threads will be used by the Kafka consumer?
>>>>
>>>
>>> This depends somewhat on the runner, but you can expect one thread per
>>> partition.
>>>
>>>
>>>> 3. If the consumer polls a set of messages A and then later B while A
>>>> is still being processed, is there a possibility of set B finishing before
>>>> A? Does parallelism control this?
>>>>
>>>
>>> yes. Beam doesn't currently have any notion of ordering. All messages
>>> are independent and can be processed at different times (the source also
>>> reserves the right to process different ranges of a single Kafka partition
>>> on different threads, though it doesn't currently do this).
>>>
>>>
>>>> 4. In the above scenario if B is committed back to the broker and
>>>> somehow A failed, upon a restart is there any way we can consume A again
>>>> without losing data?
>>>>
>>>
>>> Data should never be lost. If B is processed, then you can assume that
>>> the A data is checkpointed inside the Beam runner and will be processed to.
>>>
>>>
>>>
>>>>
>>>> Thank you.
>>>>
>>>>
>>>>
>>>

Reply via email to