But still, if we have a new deployment rolled out in which we can't recover the state from the previous snapshot/savepoint there is a possibility of a data loss here right? This is considering if we modify the existing operators or add/delete the operators in such a way that the operator states cannot get recovered from the snapshot. I think this is a very valid scenario when rolling out new features into the pipeline.
Any thoughts on this? In such a case what would be the best practice to recover those records? On Mon, Sep 26, 2022 at 12:55 AM Reuven Lax <re...@google.com> wrote: > If you are using an exactly-once runner, it will guarantee every message > is consumed once (though the mechanism might not be obvious). > > Generally what happens is that the messages are consumed into the system > in order. However if you have downstream ParDos, there is no guarantee that > they process the messages in the same order (especially if there is a > shuffle operation, such as GroupByKey, in between). > > Now a future version of the source might decide to split the Kafka > partition if it's too large to handle on one thread (e.g. split it in half > where the first half is bounded and the second half is the growing > unbounded tail of the partition). In this case the source would keep two > checkpoints for the current position in each half of the partition. (this > mode of operation probably wouldn't be compatible with checkpointing > offsets back to the broker though.). The source doesn't do this today, I'm > just mentioning it to point out another way in which things could be > consumed out of order. > > On Sun, Sep 25, 2022 at 11:40 AM Yomal de Silva <yomal.prav...@gmail.com> > wrote: > >> Hi Reuven, >> Thanks for those clarifications. >> >> For the 4th question that I raised, if A gets failed and B is committed, >> will those messages(A) get consumed again from Kafka or will the messages >> get recovered from the checkpoint and retried in that specific operator? >> >> On Sun, Sep 25, 2022 at 10:45 PM Reuven Lax via user < >> user@beam.apache.org> wrote: >> >>> >>> >>> On Sun, Sep 25, 2022 at 4:56 AM Yomal de Silva <yomal.prav...@gmail.com> >>> wrote: >>> >>>> Hi all, >>>> >>>> I have started using KafkaIO to read a data stream and have the >>>> following questions. Appreciate it if you could provide a few >>>> clarifications on the following. >>>> >>>> >>> 1. Does KafkaIO ignore the offset stored in the broker and uses the >>>> offset stored during checkpointing when consuming messages? >>>> >>> >>> Generally yes, as that's the only way to guarantee consistency (we can't >>> atomically commit to the runner and to Kafka). However when starting a new >>> pipeline, you should be able to start reading at the broker checkpoint. >>> >>> >>>> 2. How many threads will be used by the Kafka consumer? >>>> >>> >>> This depends somewhat on the runner, but you can expect one thread per >>> partition. >>> >>> >>>> 3. If the consumer polls a set of messages A and then later B while A >>>> is still being processed, is there a possibility of set B finishing before >>>> A? Does parallelism control this? >>>> >>> >>> yes. Beam doesn't currently have any notion of ordering. All messages >>> are independent and can be processed at different times (the source also >>> reserves the right to process different ranges of a single Kafka partition >>> on different threads, though it doesn't currently do this). >>> >>> >>>> 4. In the above scenario if B is committed back to the broker and >>>> somehow A failed, upon a restart is there any way we can consume A again >>>> without losing data? >>>> >>> >>> Data should never be lost. If B is processed, then you can assume that >>> the A data is checkpointed inside the Beam runner and will be processed to. >>> >>> >>> >>>> >>>> Thank you. >>>> >>>> >>>> >>>