Hi Yu, Reshuffle is treated as a persistent layer in the case I was talking about. For example, let's say that we have a simple pipeline like: Create("A", "B") -> Some ParDo -> Reshuffle() -> Your Commit ParDo and we have "A" has been outputted to Reshuffle() and "B" is still in Some ParDo. At this moment, Some ParDo fails and the runner restarts that bundle, then only "B" is going to be retired since "A" has been in the Reshuffle.
Without Reshuffle(), you cannot guarantee that the elements has been committed will not be retied. On Tue, Jan 12, 2021 at 4:28 PM Yu Zhang <yu.b.zh...@oracle.com> wrote: > Hi Boyuan, > > Thanks for the information you shared. For option 1 you mentioned below, > will there be any data loss if failures occur in *rest of pipeline* while > the *ParDo(PerformCommitSideEffects) *actually commits the data? How > Reshuffle() help perform commitment and achieve at least once semantics? > > Thanks, > Yu > > On 2020/09/10 17:05:28, Luke Cwik <l...@google.com> wrote: > > +Boyuan Zhang <bo...@google.com>> > > > > You can perform commit like side effects like this in two ways:> > > 1) Output commits to a downstream PCollection> > > Read -> PCollection<Records> -> ... rest of pipeline ...> > > \-> PCollection<Commits> -> Reshuffle -> > ParDo(PerformCommitSideEffects)> > > > > This method is preferred if you can perform a commit from a different> > > worker and you're not bound to some inprocess state (e.g. JDBC > connection)> > > since it is guaranteed to happen and isn't best effort. It also is > using> > > the data path which is optimized to be as performant as possible.> > > > > 2) Use the BundleFinalizer[1, 2] and register a callback after the > bundle> > > is durably persisted. This is best effort and exists since there are > some> > > APIs which have resident process state which can't be moved to another> > > worker so the callback always comes back to the same machine.> > > > > 1: https://s.apache.org/beam-finalizing-bundles> > > 2:> > > > https://github.com/apache/beam/blob/1463ff08a4f782594dff64873d0cb70ca13d8f0d/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFn.java#L1367> > > > > > > > On Wed, Sep 9, 2020 at 8:24 AM Alexey Romanenko <ar...@gmail.com>> > > wrote:> > > > > > Sorry, I can’ say much about SDF. Maybe Lukasz Cwik can provide more> > > > details on this.> > > >> > > > On 8 Sep 2020, at 09:01, Gaurav Nakum <ga...@oracle.com> wrote:> > > >> > > > Thank you very much for your explanation!> > > > commitOffsetsInFinalize() -> although checkpointing depends on the > runner> > > > is it not configurable in a connector implementation?> > > > Basically, I want to understand how this can be done with a new IO> > > > connector implementation, esp. with the new *SDF* API. If I am right, > in> > > > the traditional UnboundedSource API, checkpointing was configured > using> > > > *UnboundedSource.CheckpointMark*, but I am not sure about the SDF > API.> > > > Also, since KafkaIO SDF read does not provide > *commitOffsetsInFinalize* functionality> > > > could you point to some resources which discuss checkpointing using > the new> > > > SDF API?> > > >> > > > Thank you,> > > > Gaurav> > > > On 9/7/20 10:54 AM, Alexey Romanenko wrote:> > > >> > > > From my understanding:> > > > - ENABLE_AUTO_COMMIT_CONFIG will say to Kafka consumer (used inside> > > > KafkaIO to read messages) to commit periodically offsets in the > background;> > > > - on the other hand, if "commitOffsetsInFinalize()” is used, then > Beam> > > > Checkpoint mechanism will be leveraged to restart from checkpoints in > case> > > > of failures. It won’t need to wait for pipeline's finish, though it’s > up to> > > > the runner to decide when and how often to save checkpoints.> > > >> > > > In KafkaIO, it’s possible to use* only one* option for the same > transform> > > > - either ENABLE_AUTO_COMMIT_CONFIG or commitOffsetsInFinalize()> > > >> > > >> > > >> > > > On 6 Sep 2020, at 07:24, Apple <ga...@oracle.com> wrote:> > > >> > > > Hi everyone,> > > >> > > >> > > > I have a question on KafkaIO.> > > > What is the difference between setting *AUTO_COMMIT_CONFIG* and> > > > *commitOffsetsInFinalize()*? My understanding is that:> > > >> > > > 1. *AUTO_COMMIT_CONFIG* commits Kafka records as soon as> > > > KafkaIO.read() outputs messages, but I am not sure how would this be> > > > helpful, for e.g. if a consumer transform after KafkaIO.read() fails , > the> > > > messages would be lost (which sounds like at-most once semantics)> > > >> > > > 2. *commitOffsetsFinalize()* commits when the pipeline > is> > > > finished. But when does the pipeline end? In other words, when is> > > > PipelineResult.State = Done in a streaming scenario?> > > >> > > > Thanks!> > > >> > > >> > > >> > > >