Re: KafkaIO committing semantics

2021-01-19 Thread Boyuan Zhang
>
> So from my understanding, commit ParDo commit all messages in Reshuffle(),
> when user pipeline(use KafkaIO to read) fails and starts over, it will
> resume message in Reshuffle(), there might be duplicate messages but never
> lost messages in user pipeline.


If the  `user pipeline(use KafkaIO to read) fails and starts over` means
the failed bundle will be retried, then your understanding is correct. If
you mean when the pipeline is stopped and restarted, then the read will
start from the position configured by the pipeline author. When a pipeline
stopped, the records persistent in Reshuffle will also be deleted.

For option 2 using BundleFinalizer, in a connector’s sdf, the callback will
> commit offset. Since the bundle has been durably persisted by runner, even
> if user pipeline(use connecter) fails and starts over, runner will retries
> with the persisted bundle, which helps achieve at-least-once. If this is
> correct, what do you mean by best effort here?
>

Bundle finalization is best effort, instead of a guaranteed behavior, that
means, given a finalization callback, the runner tries best to ask the SDK
to perform the callback. On the SDK side, it also tries best to invoke the
callback. But the worst case could be that the finalization callback is
never called.


On Wed, Jan 13, 2021 at 2:37 PM Yu Zhang  wrote:

> Hi Boyuan,
>
> Thanks for your explanation.
> So from my understanding, commit ParDo commit all messages in Reshuffle(),
> when user pipeline(use KafkaIO to read) fails and starts over, it will
> resume message in Reshuffle(), there might be duplicate messages but never
> lost messages in user pipeline.
>
> For option 2 using BundleFinalizer, in a connector’s sdf, the callback
> will commit offset. Since the bundle has been durably persisted by runner,
> even if user pipeline(use connecter) fails and starts over, runner will
> retries with the persisted bundle, which helps achieve at-least-once. If
> this is correct, what do you mean by best effort here?
>
> Thanks,
> Yu
>
> On Jan 12, 2021, at 16:55, Boyuan Zhang  wrote:
>
> Hi Yu,
>
> Reshuffle is treated as a persistent layer in the case I was talking
> about. For example, let's say that we have a simple pipeline like:
> Create("A", "B") -> Some ParDo -> Reshuffle() -> Your Commit ParDo and we
> have "A" has been outputted to Reshuffle() and "B" is still in Some ParDo.
> At this moment, Some ParDo fails and the runner restarts that bundle, then
> only "B" is going to be retired since "A" has been in the Reshuffle.
>
> Without Reshuffle(), you cannot guarantee that the elements has been
> committed will not be retied.
>
> On Tue, Jan 12, 2021 at 4:28 PM Yu Zhang  wrote:
>
>> Hi Boyuan,
>>
>> Thanks for the information you shared. For option 1 you mentioned below,
>> will there be any data loss if failures occur in *rest of pipeline*
>> while the *ParDo(PerformCommitSideEffects) *actually commits the data?
>> How Reshuffle() help perform commitment and achieve at least once
>> semantics?
>>
>> Thanks,
>> Yu
>>
>> On 2020/09/10 17:05:28, Luke Cwik > >
>> wrote:
>> > +Boyuan Zhang > >>
>>
>> >
>> > You can perform commit like side effects like this in two ways:>
>> > 1) Output commits to a downstream PCollection>
>> > Read -> PCollection -> ... rest of pipeline ...>
>> > \-> PCollection -> Reshuffle ->
>> ParDo(PerformCommitSideEffects)>
>> >
>> > This method is preferred if you can perform a commit from a different>
>> > worker and you're not bound to some inprocess state (e.g. JDBC
>> connection)>
>> > since it is guaranteed to happen and isn't best effort. It also is
>> using>
>> > the data path which is optimized to be as performant as possible.>
>> >
>> > 2) Use the BundleFinalizer[1, 2] and register a callback after the
>> bundle>
>> > is durably persisted. This is best effort and exists since there are
>> some>
>> > APIs which have resident process state which can't be moved to another>
>> > worker so the callback always comes back to the same machine.>
>> >
>> > 1: https://s.apache.org/beam-finalizing-bundles
>> >
>>
>> > 2:>
>> >
>> https://github.com/apache/beam/blob/1463ff08a4f782594dff64873d0cb70ca13d8f0d/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFn.java#L1367
>> >
>>
>> >
>> >
>> > On Wed, Sep 9, 2020 at 8:24 AM Alexey 

Re: KafkaIO committing semantics

2021-01-12 Thread Boyuan Zhang
Hi Yu,

Reshuffle is treated as a persistent layer in the case I was talking about.
For example, let's say that we have a simple pipeline like:
Create("A", "B") -> Some ParDo -> Reshuffle() -> Your Commit ParDo and we
have "A" has been outputted to Reshuffle() and "B" is still in Some ParDo.
At this moment, Some ParDo fails and the runner restarts that bundle, then
only "B" is going to be retired since "A" has been in the Reshuffle.

Without Reshuffle(), you cannot guarantee that the elements has been
committed will not be retied.

On Tue, Jan 12, 2021 at 4:28 PM Yu Zhang  wrote:

> Hi Boyuan,
>
> Thanks for the information you shared. For option 1 you mentioned below,
> will there be any data loss if failures occur in *rest of pipeline* while
> the *ParDo(PerformCommitSideEffects) *actually commits the data? How
> Reshuffle() help perform commitment and achieve at least once semantics?
>
> Thanks,
> Yu
>
> On 2020/09/10 17:05:28, Luke Cwik  wrote:
> > +Boyuan Zhang >
> >
> > You can perform commit like side effects like this in two ways:>
> > 1) Output commits to a downstream PCollection>
> > Read -> PCollection -> ... rest of pipeline ...>
> > \-> PCollection -> Reshuffle ->
> ParDo(PerformCommitSideEffects)>
> >
> > This method is preferred if you can perform a commit from a different>
> > worker and you're not bound to some inprocess state (e.g. JDBC
> connection)>
> > since it is guaranteed to happen and isn't best effort. It also is
> using>
> > the data path which is optimized to be as performant as possible.>
> >
> > 2) Use the BundleFinalizer[1, 2] and register a callback after the
> bundle>
> > is durably persisted. This is best effort and exists since there are
> some>
> > APIs which have resident process state which can't be moved to another>
> > worker so the callback always comes back to the same machine.>
> >
> > 1: https://s.apache.org/beam-finalizing-bundles>
> > 2:>
> >
> https://github.com/apache/beam/blob/1463ff08a4f782594dff64873d0cb70ca13d8f0d/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFn.java#L1367>
>
> >
> >
> > On Wed, Sep 9, 2020 at 8:24 AM Alexey Romanenko >
> > wrote:>
> >
> > > Sorry, I can’ say much about SDF. Maybe Lukasz Cwik can provide more>
> > > details on this.>
> > >>
> > > On 8 Sep 2020, at 09:01, Gaurav Nakum  wrote:>
> > >>
> > > Thank you very much for your explanation!>
> > > commitOffsetsInFinalize() -> although checkpointing depends on the
> runner>
> > > is it not configurable in a connector implementation?>
> > > Basically, I want to understand how this can be done with a new IO>
> > > connector implementation, esp. with the new *SDF* API. If I am right,
> in>
> > > the traditional UnboundedSource API, checkpointing was configured
> using>
> > > *UnboundedSource.CheckpointMark*, but I am not sure about the SDF
> API.>
> > > Also, since KafkaIO SDF read does not provide
> *commitOffsetsInFinalize* functionality>
> > > could you point to some resources which discuss checkpointing using
> the new>
> > > SDF API?>
> > >>
> > > Thank you,>
> > > Gaurav>
> > > On 9/7/20 10:54 AM, Alexey Romanenko wrote:>
> > >>
> > > From my understanding:>
> > > - ENABLE_AUTO_COMMIT_CONFIG will say to Kafka consumer (used inside>
> > > KafkaIO to read messages) to commit periodically offsets in the
> background;>
> > > - on the other hand, if "commitOffsetsInFinalize()” is used, then
> Beam>
> > > Checkpoint mechanism will be leveraged to restart from checkpoints in
> case>
> > > of failures. It won’t need to wait for pipeline's finish, though it’s
> up to>
> > > the runner to decide when and how often to save checkpoints.>
> > >>
> > > In KafkaIO, it’s possible to use* only one* option for the same
> transform>
> > > - either ENABLE_AUTO_COMMIT_CONFIG or commitOffsetsInFinalize()>
> > >>
> > >>
> > >>
> > > On 6 Sep 2020, at 07:24, Apple  wrote:>
> > >>
> > > Hi everyone,>
> > >>
> > >>
> > > I have a question on KafkaIO.>
> > > What is the difference between setting *AUTO_COMMIT_CONFIG* and>
> > > *commitOffsetsInFinalize()*? My understanding is that:>
> > >>
> > > 1.*AUTO_COMMIT_CONFIG* commits Kafka records as soon as>
> > > KafkaIO.read() outputs messages, but I am not sure how would this be>
> > > helpful, for e.g. if a consumer transform after KafkaIO.read() fails ,
> the>
> > > messages would be lost (which sounds like at-most once semantics)>
> > >>
> > > 2.*commitOffsetsFinalize()*  commits when the pipeline
> is>
> > > finished. But when does the pipeline end? In other words, when is>
> > > PipelineResult.State = Done in a streaming scenario?>
> > >>
> > > Thanks!>
> > >>
> > >>
> > >>
> >
>


Re: KafkaIO committing semantics

2021-01-12 Thread Yu Zhang
Hi Boyuan,

Thanks for the information you shared. For option 1 you mentioned below, will 
there be any data loss if failures occur in rest of pipeline while the 
ParDo(PerformCommitSideEffects) actually commits the data? How Reshuffle() help 
perform commitment and achieve at least once semantics? 

Thanks,
Yu 

On 2020/09/10 17:05:28, Luke Cwik  wrote: 
> +Boyuan Zhang > 
> 
> You can perform commit like side effects like this in two ways:> 
> 1) Output commits to a downstream PCollection> 
> Read -> PCollection -> ... rest of pipeline ...> 
> \-> PCollection -> Reshuffle -> ParDo(PerformCommitSideEffects)> 
> 
> This method is preferred if you can perform a commit from a different> 
> worker and you're not bound to some inprocess state (e.g. JDBC connection)> 
> since it is guaranteed to happen and isn't best effort. It also is using> 
> the data path which is optimized to be as performant as possible.> 
> 
> 2) Use the BundleFinalizer[1, 2] and register a callback after the bundle> 
> is durably persisted. This is best effort and exists since there are some> 
> APIs which have resident process state which can't be moved to another> 
> worker so the callback always comes back to the same machine.> 
> 
> 1: https://s.apache.org/beam-finalizing-bundles> 
> 2:> 
> https://github.com/apache/beam/blob/1463ff08a4f782594dff64873d0cb70ca13d8f0d/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFn.java#L1367>
>  
> 
> 
> On Wed, Sep 9, 2020 at 8:24 AM Alexey Romanenko > 
> wrote:> 
> 
> > Sorry, I can’ say much about SDF. Maybe Lukasz Cwik can provide more> 
> > details on this.> 
> >> 
> > On 8 Sep 2020, at 09:01, Gaurav Nakum  wrote:> 
> >> 
> > Thank you very much for your explanation!> 
> > commitOffsetsInFinalize() -> although checkpointing depends on the runner> 
> > is it not configurable in a connector implementation?> 
> > Basically, I want to understand how this can be done with a new IO> 
> > connector implementation, esp. with the new *SDF* API. If I am right, in> 
> > the traditional UnboundedSource API, checkpointing was configured using> 
> > *UnboundedSource.CheckpointMark*, but I am not sure about the SDF API.> 
> > Also, since KafkaIO SDF read does not provide *commitOffsetsInFinalize* 
> > functionality> 
> > could you point to some resources which discuss checkpointing using the 
> > new> 
> > SDF API?> 
> >> 
> > Thank you,> 
> > Gaurav> 
> > On 9/7/20 10:54 AM, Alexey Romanenko wrote:> 
> >> 
> > From my understanding:> 
> > - ENABLE_AUTO_COMMIT_CONFIG will say to Kafka consumer (used inside> 
> > KafkaIO to read messages) to commit periodically offsets in the 
> > background;> 
> > - on the other hand, if "commitOffsetsInFinalize()” is used, then Beam> 
> > Checkpoint mechanism will be leveraged to restart from checkpoints in case> 
> > of failures. It won’t need to wait for pipeline's finish, though it’s up 
> > to> 
> > the runner to decide when and how often to save checkpoints.> 
> >> 
> > In KafkaIO, it’s possible to use* only one* option for the same transform> 
> > - either ENABLE_AUTO_COMMIT_CONFIG or commitOffsetsInFinalize()> 
> >> 
> >> 
> >> 
> > On 6 Sep 2020, at 07:24, Apple  wrote:> 
> >> 
> > Hi everyone,> 
> >> 
> >> 
> > I have a question on KafkaIO.> 
> > What is the difference between setting *AUTO_COMMIT_CONFIG* and> 
> > *commitOffsetsInFinalize()*? My understanding is that:> 
> >> 
> > 1.*AUTO_COMMIT_CONFIG* commits Kafka records as soon as> 
> > KafkaIO.read() outputs messages, but I am not sure how would this be> 
> > helpful, for e.g. if a consumer transform after KafkaIO.read() fails , the> 
> > messages would be lost (which sounds like at-most once semantics)> 
> >> 
> > 2.*commitOffsetsFinalize()*  commits when the pipeline is> 
> > finished. But when does the pipeline end? In other words, when is> 
> > PipelineResult.State = Done in a streaming scenario?> 
> >> 
> > Thanks!> 
> >> 
> >> 
> >> 
> 

Re: KafkaIO committing semantics

2020-09-10 Thread Luke Cwik
+Boyuan Zhang 

You can perform commit like side effects like this in two ways:
1) Output commits to a downstream PCollection
Read -> PCollection -> ... rest of pipeline ...
\-> PCollection -> Reshuffle -> ParDo(PerformCommitSideEffects)

This method is preferred if you can perform a commit from a different
worker and you're not bound to some inprocess state (e.g. JDBC connection)
since it is guaranteed to happen and isn't best effort. It also is using
the data path which is optimized to be as performant as possible.

2) Use the BundleFinalizer[1, 2] and register a callback after the bundle
is durably persisted. This is best effort and exists since there are some
APIs which have resident process state which can't be moved to another
worker so the callback always comes back to the same machine.

1: https://s.apache.org/beam-finalizing-bundles
2:
https://github.com/apache/beam/blob/1463ff08a4f782594dff64873d0cb70ca13d8f0d/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFn.java#L1367


On Wed, Sep 9, 2020 at 8:24 AM Alexey Romanenko 
wrote:

> Sorry, I can’ say much about SDF. Maybe Lukasz Cwik can provide more
> details on this.
>
> On 8 Sep 2020, at 09:01, Gaurav Nakum  wrote:
>
> Thank you very much for your explanation!
> commitOffsetsInFinalize() -> although checkpointing depends on the runner
> is it not configurable in a connector implementation?
> Basically, I want to understand how this can be done with a new IO
> connector implementation, esp. with the new *SDF* API. If I am right, in
> the traditional UnboundedSource API, checkpointing was configured using
> *UnboundedSource.CheckpointMark*, but I am not sure about the SDF API.
> Also, since KafkaIO SDF read does not provide *commitOffsetsInFinalize* 
> functionality
> could you point to some resources which discuss checkpointing using the new
> SDF API?
>
> Thank you,
> Gaurav
> On 9/7/20 10:54 AM, Alexey Romanenko wrote:
>
> From my understanding:
> - ENABLE_AUTO_COMMIT_CONFIG will say to Kafka consumer (used inside
> KafkaIO to read messages) to commit periodically offsets in the background;
> - on the other hand, if "commitOffsetsInFinalize()” is used, then Beam
> Checkpoint mechanism will be leveraged to restart from checkpoints in case
> of failures. It won’t need to wait for pipeline's finish, though it’s up to
> the runner to decide when and how often to save checkpoints.
>
> In KafkaIO, it’s possible to use* only one* option for the same transform
> - either ENABLE_AUTO_COMMIT_CONFIG or commitOffsetsInFinalize()
>
>
>
> On 6 Sep 2020, at 07:24, Apple  wrote:
>
> Hi everyone,
>
>
> I have a question on KafkaIO.
> What is the difference between setting *AUTO_COMMIT_CONFIG* and
> *commitOffsetsInFinalize()*? My understanding is that:
>
> 1.*AUTO_COMMIT_CONFIG* commits Kafka records as soon as
> KafkaIO.read() outputs messages, but I am not sure how would this be
> helpful, for e.g. if a consumer transform after KafkaIO.read() fails , the
> messages would be lost (which sounds like at-most once semantics)
>
> 2.*commitOffsetsFinalize()*  commits when the pipeline is
> finished. But when does the pipeline end? In other words, when is
> PipelineResult.State = Done in a streaming scenario?
>
> Thanks!
>
>
>


Re: KafkaIO committing semantics

2020-09-09 Thread Alexey Romanenko
Sorry, I can’ say much about SDF. Maybe Lukasz Cwik can provide more details on 
this.

> On 8 Sep 2020, at 09:01, Gaurav Nakum  wrote:
> 
> Thank you very much for your explanation!
> commitOffsetsInFinalize() -> although checkpointing depends on the runner is 
> it not configurable in a connector implementation?
> Basically, I want to understand how this can be done with a new IO connector 
> implementation, esp. with the new SDF API. If I am right, in the traditional 
> UnboundedSource API, checkpointing was configured using 
> UnboundedSource.CheckpointMark, but I am not sure about the SDF API. 
> Also, since KafkaIO SDF read does not provide commitOffsetsInFinalize 
> functionality could you point to some resources which discuss checkpointing 
> using the new SDF API?
> 
> Thank you,
> Gaurav
> On 9/7/20 10:54 AM, Alexey Romanenko wrote:
>> From my understanding: 
>> - ENABLE_AUTO_COMMIT_CONFIG will say to Kafka consumer (used inside KafkaIO 
>> to read messages) to commit periodically offsets in the background;
>> - on the other hand, if "commitOffsetsInFinalize()” is used, then Beam 
>> Checkpoint mechanism will be leveraged to restart from checkpoints in case 
>> of failures. It won’t need to wait for pipeline's finish, though it’s up to 
>> the runner to decide when and how often to save checkpoints.
>>  
>> In KafkaIO, it’s possible to use only one option for the same transform - 
>> either ENABLE_AUTO_COMMIT_CONFIG or commitOffsetsInFinalize() 
>>  
>>  
>>> On 6 Sep 2020, at 07:24, Apple >> > wrote:
>>>  
>>> Hi everyone, 
>>> 
>>> 
>>> I have a question on KafkaIO.
>>> What is the difference between setting AUTO_COMMIT_CONFIG and 
>>> commitOffsetsInFinalize()? My understanding is that:
>>>  
>>> 1.AUTO_COMMIT_CONFIG commits Kafka records as soon as 
>>> KafkaIO.read() outputs messages, but I am not sure how would this be 
>>> helpful, for e.g. if a consumer transform after KafkaIO.read() fails , the 
>>> messages would be lost (which sounds like at-most once semantics)
>>>  
>>> 2.commitOffsetsFinalize()  commits when the pipeline is 
>>> finished. But when does the pipeline end? In other words, when is 
>>> PipelineResult.State = Done in a streaming scenario?
>>>  
>>> Thanks!



Re: KafkaIO committing semantics

2020-09-08 Thread Gaurav Nakum
Thank you very much for your explanation!

commitOffsetsInFinalize() -> although checkpointing depends on the runner is it 
not configurable in a connector implementation?

Basically, I want to understand how this can be done with a new IO connector 
implementation, esp. with the new SDF API. If I am right, in the traditional 
UnboundedSource API, checkpointing was configured using 
UnboundedSource.CheckpointMark, but I am not sure about the SDF API. 

Also, since KafkaIO SDF read does not provide commitOffsetsInFinalize 
functionality could you point to some resources which discuss checkpointing 
using the new SDF API?

Thank you,
Gaurav

 

On 9/7/20 10:54 AM, Alexey Romanenko wrote:

>From my understanding: 

- ENABLE_AUTO_COMMIT_CONFIG will say to Kafka consumer (used inside KafkaIO to 
read messages) to commit periodically offsets in the background;

- on the other hand, if "commitOffsetsInFinalize()” is used, then Beam 
Checkpoint mechanism will be leveraged to restart from checkpoints in case of 
failures. It won’t need to wait for pipeline's finish, though it’s up to the 
runner to decide when and how often to save checkpoints.

 

In KafkaIO, it’s possible to use only one option for the same transform - 
either ENABLE_AUTO_COMMIT_CONFIG or commitOffsetsInFinalize() 

 

 

On 6 Sep 2020, at 07:24, Apple  wrote:

 

Hi everyone, 


I have a question on KafkaIO.
What is the difference between setting AUTO_COMMIT_CONFIG and 
commitOffsetsInFinalize()? My understanding is that:

 

1.AUTO_COMMIT_CONFIG commits Kafka records as soon as 
KafkaIO.read() outputs messages, but I am not sure how would this be helpful, 
for e.g. if a consumer transform after KafkaIO.read() fails , the messages 
would be lost (which sounds like at-most once semantics)

 

2.commitOffsetsFinalize()  commits when the pipeline is finished. 
But when does the pipeline end? In other words, when is PipelineResult.State = 
Done in a streaming scenario?

 

Thanks!

 



Re: KafkaIO committing semantics

2020-09-07 Thread Alexey Romanenko
From my understanding: 
- ENABLE_AUTO_COMMIT_CONFIG will say to Kafka consumer (used inside KafkaIO to 
read messages) to commit periodically offsets in the background;
- on the other hand, if "commitOffsetsInFinalize()” is used, then Beam 
Checkpoint mechanism will be leveraged to restart from checkpoints in case of 
failures. It won’t need to wait for pipeline's finish, though it’s up to the 
runner to decide when and how often to save checkpoints.

In KafkaIO, it’s possible to use only one option for the same transform - 
either ENABLE_AUTO_COMMIT_CONFIG or commitOffsetsInFinalize()


> On 6 Sep 2020, at 07:24, Apple  wrote:
> 
> Hi everyone, 
> 
> I have a question on KafkaIO.
> What is the difference between setting AUTO_COMMIT_CONFIG and 
> commitOffsetsInFinalize()? My understanding is that:
>  
> 1.AUTO_COMMIT_CONFIG commits Kafka records as soon as 
> KafkaIO.read() outputs messages, but I am not sure how would this be helpful, 
> for e.g. if a consumer transform after KafkaIO.read() fails , the messages 
> would be lost (which sounds like at-most once semantics)
>  
> 2.commitOffsetsFinalize()  commits when the pipeline is finished. 
> But when does the pipeline end? In other words, when is PipelineResult.State 
> = Done in a streaming scenario?
>  
> Thanks!