Re: KafkaIO committing semantics

2020-09-07 Thread Alexey Romanenko
From my understanding: - ENABLE_AUTO_COMMIT_CONFIG will say to Kafka consumer (used inside KafkaIO to read messages) to commit periodically offsets in the background; - on the other hand, if "commitOffsetsInFinalize()” is used, then Beam Checkpoint mechanism will be leveraged to restart from che

Re: KafkaIO committing semantics

2020-09-08 Thread Gaurav Nakum
Thank you very much for your explanation! commitOffsetsInFinalize() -> although checkpointing depends on the runner is it not configurable in a connector implementation? Basically, I want to understand how this can be done with a new IO connector implementation, esp. with the new SDF API. If I

Re: KafkaIO committing semantics

2020-09-09 Thread Alexey Romanenko
Sorry, I can’ say much about SDF. Maybe Lukasz Cwik can provide more details on this. > On 8 Sep 2020, at 09:01, Gaurav Nakum wrote: > > Thank you very much for your explanation! > commitOffsetsInFinalize() -> although checkpointing depends on the runner is > it not configurable in a connector

Re: KafkaIO committing semantics

2020-09-10 Thread Luke Cwik
+Boyuan Zhang You can perform commit like side effects like this in two ways: 1) Output commits to a downstream PCollection Read -> PCollection -> ... rest of pipeline ... \-> PCollection -> Reshuffle -> ParDo(PerformCommitSideEffects) This method is preferred if you can perform a commit fro

Re: KafkaIO committing semantics

2021-01-12 Thread Yu Zhang
Hi Boyuan, Thanks for the information you shared. For option 1 you mentioned below, will there be any data loss if failures occur in rest of pipeline while the ParDo(PerformCommitSideEffects) actually commits the data? How Reshuffle() help perform commitment and achieve at least once semantics?

Re: KafkaIO committing semantics

2021-01-12 Thread Boyuan Zhang
Hi Yu, Reshuffle is treated as a persistent layer in the case I was talking about. For example, let's say that we have a simple pipeline like: Create("A", "B") -> Some ParDo -> Reshuffle() -> Your Commit ParDo and we have "A" has been outputted to Reshuffle() and "B" is still in Some ParDo. At thi

Re: KafkaIO committing semantics

2021-01-13 Thread Yu Zhang
Hi Boyuan, Thanks for your explanation. So from my understanding, commit ParDo commit all messages in Reshuffle(), when user pipeline(use KafkaIO to read) fails and starts over, it will resume message in Reshuffle(), there might be duplicate messages but never lost messages in user pipeline.

Re: KafkaIO committing semantics

2021-01-19 Thread Boyuan Zhang
> > So from my understanding, commit ParDo commit all messages in Reshuffle(), > when user pipeline(use KafkaIO to read) fails and starts over, it will > resume message in Reshuffle(), there might be duplicate messages but never > lost messages in user pipeline. If the `user pipeline(use KafkaIO