Re: [DISCUSS] Cancel the configuration of autoAck in Function framework

2022-05-09 Thread Neng Lu
Regarding your question "why AUTO_ACK is designed this way"

I think at the time when it's firstly implemented, the AUTO_ACK is just a 
convenient way to help user ack the message.

We can discuss the gap between expected behavior and actual behavior and try to 
resolve or simplify it.

On 2022/05/10 01:14:07 Baozi wrote:
> Thanks reply,
> 
> > If AUTO_ACK is TRUE, then the JavaInstanceRunnable will be acking messages.
> > If AUTO_ACK is  FALSE, then the acking will be done by Sink implementation.
> 
> A little confused, I want to know why AUTO_ACK is designed this way.
> 
> I'll give another example:
> 
> > If AUTO_ACK is TRUE, then the JavaInstanceRunnable will be acking messages.
> 
> 
> And if Guarantees != ATMOST_ONCE,then the JavaInstanceRunnable not will ack 
> message.
> 
> > JavaInstanceRunnable#run():Line273
> 
> 
> Thanks,
> Baodi Shi
> 
> > 2022年5月10日 01:0009,Neng Lu  写道:
> > 
> > Thanks for this detailed discussion about processing guarantee and ack.
> > These two settings are together affecting the behavior of a running 
> > function.
> > 
> > One thing I want to clarify is: 
> > AUTO_ACK setting means if the function runtime will ack messages or not. 
> > ("function runtime" here specifically refers to the JavaInstanceRunnable. 
> > If the ack happens inside a sink's implemented write method, it's not 
> > auto-ack). 
> > 
> > If AUTO_ACK is TRUE, then the JavaInstanceRunnable will be acking messages.
> > If AUTO_ACK is  FALSE, then the acking will be done by Sink implementation.
> > 
> > Now with this context, let's review your two scenarios:
> > 
> >> 1.If the user set Guarantees == ATMOST_ONCE and autoAck == false.
> > To be precise, the processing semantics is not ATLEAST_ONCE. It's actually 
> > left to the implemented Sink to decide which semantics it is. It can be 
> > ATMOST_ONCE, ATLEAST_ONCE and probably EFFECTIVELLY_ONCE.
> > 
> >> 2. If the user thinks that the framework doesn’t auto ack when autoAck == 
> >> false
> > This behavior is actually correct based on our previous context.
> > 
> > A real problematic scenario here is when USER sets 
> > ATLEAST_ONCE/EFFECTIVELY_ONCE and AUTO_ACK=true. I don't think the 
> > JavaInstanceRunnable can ack for use under these cases. So there should be 
> > some check to ban user submit function with such configs.
> > 
> > 
> > 
> > On 2022/05/09 09:02:12 Baozi wrote:
> >> Hi, guys:
> >> 
> >> I found out that autoAck configuration in function framework now affects 
> >> Delivery semantics, and make it difficult for users to understand. Refer 
> >> to the following two scenarios.
> >> 
> >> 1. If the user understands that the semantics of Guarantees shall prevail
> >> 
> >> If the user set Guarantees == ATMOST_ONCE and autoAck == false. Then the 
> >> processing semantics of the actual Function will become ATLEAST_ONCE. 
> >> Refer to the following code, this scenario will not immediately ack.
> >> 
> >> JavaInstanceRunnable#run():Line273  
> >> 
> >> if (instanceConfig.getFunctionDetails().getProcessingGuarantees() == 
> >> org.apache.pulsar.functions
> >>.proto.Function.ProcessingGuarantees.ATMOST_ONCE) {
> >>if (instanceConfig.getFunctionDetails().getAutoAck()) { // just when 
> >> autoAck == true to auto ack
> >>currentRecord.ack();
> >>}
> >> }
> >> 
> >> 2. If the user thinks that the framework doesn’t auto ack when autoAck == 
> >> false
> >> 
> >> According to the following code, the framework is still automatically 
> >> acked.
> >> 
> >> PulsarSinkAtLeastOnceProcessor#sendOutputMessage():Line275 
> >> 
> >> PulsarSinkEffectivelyOnceProcessor#sendOutputMessage():Line325 
> >> 
> >> 
> >> public void sendOutputMessage(TypedMessageBuilder msg, SinkRecord 
> >> record) {
> >>msg.sendAsync()
> >>.thenAccept(messageId -> record.ack()) 
> >>.exceptionally(getPublishErrorHandler(record, true));
> >> }
> >> 
> >> To sum up, users may be confused when configuring Guarantees and autoAck, 
> >> and cannot judge their correct expected behavior.
> >> 
> >> I would like to discuss whether it is possible to cancel the autoAck 
> >> configuration and add a CUSTOM type for Guarantees.
> >> 
> >> switch (processingGuarantees) {
> >>case Guarantees.ATMOST_ONCE: After the framework consumes the message, 
> >> it immediately acks
> >>case Guarantees.ATLEAST_ONCE: After processing on the source side, 
> >> perform ack again
> >>case 

Re: [DISCUSS] Cancel the configuration of autoAck in Function framework

2022-05-09 Thread Neng Lu
> For users, sink is also part of the function framework.

^^ Is this written inside any Pulsar documentation? If you look the code 
closely, the source and sink are actually configurable in Java runtime. User 
can actually provide their own source/sink implementation.

On 2022/05/10 01:38:48 Baozi wrote:
> > AUTO_ACK setting means if the function runtime will ack messages or not. 
> > ("function runtime" here specifically refers to the JavaInstanceRunnable. 
> > If the ack happens inside a sink's implemented write method, it's not 
> > auto-ack). 
> The description of the official website document is:Whether or not the 
> framework acknowledges messages automatically.
> For users, sink is also part of the function framework.
> 
> 
> Thanks,
> Baodi Shi
> 
> > 2022年5月10日 09:1407,Baozi  写道:
> > 
> > Thanks for this detailed discussion about processing guarantee and ack.
> > These two settings are together affecting the behavior of a running 
> > function.
> > 
> > One thing I want to clarify is: 
> > AUTO_ACK setting means if the function runtime will ack messages or not. 
> > ("function runtime" here specifically refers to the JavaInstanceRunnable. 
> > If the ack happens inside a sink's implemented write method, it's not 
> > auto-ack). 
> > 
> > If AUTO_ACK is TRUE, then the JavaInstanceRunnable will be acking messages.
> > If AUTO_ACK is FALSE, then the acking will be done by Sink implementation.
> > 
> > Now with this context, let's review your two scenarios:
> > 
> >> 1.If the user set Guarantees == ATMOST_ONCE and autoAck == false.
> > To be precise, the processing semantics is not ATLEAST_ONCE. It's actually 
> > left to the implemented Sink to decide which semantics it is. It can be 
> > ATMOST_ONCE, ATLEAST_ONCE and probably EFFECTIVELLY_ONCE.
> > 
> >> 2. If the user thinks that the framework doesn’t auto ack when autoAck == 
> >> false
> > This behavior is actually correct based on our previous context.
> > 
> > A real problematic scenario here is when USER sets 
> > ATLEAST_ONCE/EFFECTIVELY_ONCE and AUTO_ACK=true. I don't think the 
> > JavaInstanceRunnable can ack for use under these cases. So there should be 
> > some check to ban user submit function with such configs.
> > 
> > 
> > 
> > On 2022/05/09 09:02:12 Baozi wrote:
> >> Hi, guys:
> >> 
> >> I found out that autoAck configuration in function framework now affects 
> >> Delivery semantics, and make it difficult for users to understand. Refer 
> >> to the following two scenarios.
> >> 
> >> 1. If the user understands that the semantics of Guarantees shall prevail
> >> 
> >> If the user set Guarantees == ATMOST_ONCE and autoAck == false. Then the 
> >> processing semantics of the actual Function will become ATLEAST_ONCE. 
> >> Refer to the following code, this scenario will not immediately ack.
> >> 
> >> JavaInstanceRunnable#run():Line273 
> >>  >>  
> >> >
> >> if (instanceConfig.getFunctionDetails().getProcessingGuarantees() == 
> >> org.apache.pulsar.functions
> >> .proto.Function.ProcessingGuarantees.ATMOST_ONCE) {
> >> if (instanceConfig.getFunctionDetails().getAutoAck()) { // just when 
> >> autoAck == true to auto ack
> >> currentRecord.ack();
> >> }
> >> }
> >> 
> >> 2. If the user thinks that the framework doesn’t auto ack when autoAck == 
> >> false
> >> 
> >> According to the following code, the framework is still automatically 
> >> acked.
> >> 
> >> PulsarSinkAtLeastOnceProcessor#sendOutputMessage():Line275 
> >>  >>  
> >> >
> >> PulsarSinkEffectivelyOnceProcessor#sendOutputMessage():Line325 
> >>  >>  
> >> >
> >> 
> >> public void sendOutputMessage(TypedMessageBuilder msg, SinkRecord 
> >> record) {
> >> msg.sendAsync()
> >> .thenAccept(messageId -> record.ack()) 
> >> .exceptionally(getPublishErrorHandler(record, true));
> >> }
> >> 
> >> To sum up, users may be confused when configuring Guarantees and autoAck, 
> >> and cannot judge their 

Re: [DISCUSS] Cancel the configuration of autoAck in Function framework

2022-05-09 Thread Baozi
> AUTO_ACK setting means if the function runtime will ack messages or not. 
> ("function runtime" here specifically refers to the JavaInstanceRunnable. If 
> the ack happens inside a sink's implemented write method, it's not auto-ack). 
The description of the official website document is:Whether or not the 
framework acknowledges messages automatically.
For users, sink is also part of the function framework.


Thanks,
Baodi Shi

> 2022年5月10日 09:1407,Baozi  写道:
> 
> Thanks for this detailed discussion about processing guarantee and ack.
> These two settings are together affecting the behavior of a running function.
> 
> One thing I want to clarify is: 
> AUTO_ACK setting means if the function runtime will ack messages or not. 
> ("function runtime" here specifically refers to the JavaInstanceRunnable. If 
> the ack happens inside a sink's implemented write method, it's not auto-ack). 
> 
> If AUTO_ACK is TRUE, then the JavaInstanceRunnable will be acking messages.
> If AUTO_ACK is FALSE, then the acking will be done by Sink implementation.
> 
> Now with this context, let's review your two scenarios:
> 
>> 1.If the user set Guarantees == ATMOST_ONCE and autoAck == false.
> To be precise, the processing semantics is not ATLEAST_ONCE. It's actually 
> left to the implemented Sink to decide which semantics it is. It can be 
> ATMOST_ONCE, ATLEAST_ONCE and probably EFFECTIVELLY_ONCE.
> 
>> 2. If the user thinks that the framework doesn’t auto ack when autoAck == 
>> false
> This behavior is actually correct based on our previous context.
> 
> A real problematic scenario here is when USER sets 
> ATLEAST_ONCE/EFFECTIVELY_ONCE and AUTO_ACK=true. I don't think the 
> JavaInstanceRunnable can ack for use under these cases. So there should be 
> some check to ban user submit function with such configs.
> 
> 
> 
> On 2022/05/09 09:02:12 Baozi wrote:
>> Hi, guys:
>> 
>> I found out that autoAck configuration in function framework now affects 
>> Delivery semantics, and make it difficult for users to understand. Refer to 
>> the following two scenarios.
>> 
>> 1. If the user understands that the semantics of Guarantees shall prevail
>> 
>> If the user set Guarantees == ATMOST_ONCE and autoAck == false. Then the 
>> processing semantics of the actual Function will become ATLEAST_ONCE. Refer 
>> to the following code, this scenario will not immediately ack.
>> 
>> JavaInstanceRunnable#run():Line273 
>> >  
>> >
>> if (instanceConfig.getFunctionDetails().getProcessingGuarantees() == 
>> org.apache.pulsar.functions
>> .proto.Function.ProcessingGuarantees.ATMOST_ONCE) {
>> if (instanceConfig.getFunctionDetails().getAutoAck()) { // just when autoAck 
>> == true to auto ack
>> currentRecord.ack();
>> }
>> }
>> 
>> 2. If the user thinks that the framework doesn’t auto ack when autoAck == 
>> false
>> 
>> According to the following code, the framework is still automatically acked.
>> 
>> PulsarSinkAtLeastOnceProcessor#sendOutputMessage():Line275 
>> >  
>> >
>> PulsarSinkEffectivelyOnceProcessor#sendOutputMessage():Line325 
>> >  
>> >
>> 
>> public void sendOutputMessage(TypedMessageBuilder msg, SinkRecord 
>> record) {
>> msg.sendAsync()
>> .thenAccept(messageId -> record.ack()) 
>> .exceptionally(getPublishErrorHandler(record, true));
>> }
>> 
>> To sum up, users may be confused when configuring Guarantees and autoAck, 
>> and cannot judge their correct expected behavior.
>> 
>> I would like to discuss whether it is possible to cancel the autoAck 
>> configuration and add a CUSTOM type for Guarantees.
>> 
>> switch (processingGuarantees) {
>>  case Guarantees.ATMOST_ONCE: After the framework consumes the message, 
>> it immediately acks
>>  case Guarantees.ATLEAST_ONCE: After processing on the source side, 
>> perform ack again
>>  case Guarantees.EFFECTIVELY_ONCE: After processing on the source side, 
>> perform ack again
>>  case 

Re: [DISCUSS] Cancel the configuration of autoAck in Function framework

2022-05-09 Thread Baozi
Thanks reply,

> If AUTO_ACK is TRUE, then the JavaInstanceRunnable will be acking messages.
> If AUTO_ACK is  FALSE, then the acking will be done by Sink implementation.

A little confused, I want to know why AUTO_ACK is designed this way.

I'll give another example:

> If AUTO_ACK is TRUE, then the JavaInstanceRunnable will be acking messages.


And if Guarantees != ATMOST_ONCE,then the JavaInstanceRunnable not will ack 
message.

> JavaInstanceRunnable#run():Line273


Thanks,
Baodi Shi

> 2022年5月10日 01:0009,Neng Lu  写道:
> 
> Thanks for this detailed discussion about processing guarantee and ack.
> These two settings are together affecting the behavior of a running function.
> 
> One thing I want to clarify is: 
> AUTO_ACK setting means if the function runtime will ack messages or not. 
> ("function runtime" here specifically refers to the JavaInstanceRunnable. If 
> the ack happens inside a sink's implemented write method, it's not auto-ack). 
> 
> If AUTO_ACK is TRUE, then the JavaInstanceRunnable will be acking messages.
> If AUTO_ACK is  FALSE, then the acking will be done by Sink implementation.
> 
> Now with this context, let's review your two scenarios:
> 
>> 1.If the user set Guarantees == ATMOST_ONCE and autoAck == false.
> To be precise, the processing semantics is not ATLEAST_ONCE. It's actually 
> left to the implemented Sink to decide which semantics it is. It can be 
> ATMOST_ONCE, ATLEAST_ONCE and probably EFFECTIVELLY_ONCE.
> 
>> 2. If the user thinks that the framework doesn’t auto ack when autoAck == 
>> false
> This behavior is actually correct based on our previous context.
> 
> A real problematic scenario here is when USER sets 
> ATLEAST_ONCE/EFFECTIVELY_ONCE and AUTO_ACK=true. I don't think the 
> JavaInstanceRunnable can ack for use under these cases. So there should be 
> some check to ban user submit function with such configs.
> 
> 
> 
> On 2022/05/09 09:02:12 Baozi wrote:
>> Hi, guys:
>> 
>> I found out that autoAck configuration in function framework now affects 
>> Delivery semantics, and make it difficult for users to understand. Refer to 
>> the following two scenarios.
>> 
>> 1. If the user understands that the semantics of Guarantees shall prevail
>> 
>> If the user set Guarantees == ATMOST_ONCE and autoAck == false. Then the 
>> processing semantics of the actual Function will become ATLEAST_ONCE. Refer 
>> to the following code, this scenario will not immediately ack.
>> 
>> JavaInstanceRunnable#run():Line273  
>> 
>> if (instanceConfig.getFunctionDetails().getProcessingGuarantees() == 
>> org.apache.pulsar.functions
>>.proto.Function.ProcessingGuarantees.ATMOST_ONCE) {
>>if (instanceConfig.getFunctionDetails().getAutoAck()) { // just when 
>> autoAck == true to auto ack
>>currentRecord.ack();
>>}
>> }
>> 
>> 2. If the user thinks that the framework doesn’t auto ack when autoAck == 
>> false
>> 
>> According to the following code, the framework is still automatically acked.
>> 
>> PulsarSinkAtLeastOnceProcessor#sendOutputMessage():Line275 
>> 
>> PulsarSinkEffectivelyOnceProcessor#sendOutputMessage():Line325 
>> 
>> 
>> public void sendOutputMessage(TypedMessageBuilder msg, SinkRecord 
>> record) {
>>msg.sendAsync()
>>.thenAccept(messageId -> record.ack()) 
>>.exceptionally(getPublishErrorHandler(record, true));
>> }
>> 
>> To sum up, users may be confused when configuring Guarantees and autoAck, 
>> and cannot judge their correct expected behavior.
>> 
>> I would like to discuss whether it is possible to cancel the autoAck 
>> configuration and add a CUSTOM type for Guarantees.
>> 
>> switch (processingGuarantees) {
>>  case Guarantees.ATMOST_ONCE: After the framework consumes the message, 
>> it immediately acks
>>  case Guarantees.ATLEAST_ONCE: After processing on the source side, 
>> perform ack again
>>  case Guarantees.EFFECTIVELY_ONCE: After processing on the source side, 
>> perform ack again
>>  case  Guarantees.CUSTOM: The function framework does not help users 
>> with any ack operations and semantic guarantees
>> }
>> 
>> If you have any ideas, welcome to discuss. If everyone agrees with this 
>> idea, I will mention a PIP to promote implementation.
>> 
>> Thanks,
>> Baodi Shi
>> 
>> 



Re: [DISCUSS] Cancel the configuration of autoAck in Function framework

2022-05-09 Thread Neng Lu
Thanks for this detailed discussion about processing guarantee and ack.
These two settings are together affecting the behavior of a running function.

One thing I want to clarify is: 
AUTO_ACK setting means if the function runtime will ack messages or not. 
("function runtime" here specifically refers to the JavaInstanceRunnable. If 
the ack happens inside a sink's implemented write method, it's not auto-ack). 

If AUTO_ACK is TRUE, then the JavaInstanceRunnable will be acking messages.
If AUTO_ACK is  FALSE, then the acking will be done by Sink implementation.

Now with this context, let's review your two scenarios:

> 1.If the user set Guarantees == ATMOST_ONCE and autoAck == false.
To be precise, the processing semantics is not ATLEAST_ONCE. It's actually left 
to the implemented Sink to decide which semantics it is. It can be ATMOST_ONCE, 
ATLEAST_ONCE and probably EFFECTIVELLY_ONCE.

> 2. If the user thinks that the framework doesn’t auto ack when autoAck == 
> false
This behavior is actually correct based on our previous context.

A real problematic scenario here is when USER sets 
ATLEAST_ONCE/EFFECTIVELY_ONCE and AUTO_ACK=true. I don't think the 
JavaInstanceRunnable can ack for use under these cases. So there should be some 
check to ban user submit function with such configs.



On 2022/05/09 09:02:12 Baozi wrote:
> Hi, guys:
> 
> I found out that autoAck configuration in function framework now affects 
> Delivery semantics, and make it difficult for users to understand. Refer to 
> the following two scenarios.
> 
> 1. If the user understands that the semantics of Guarantees shall prevail
> 
> If the user set Guarantees == ATMOST_ONCE and autoAck == false. Then the 
> processing semantics of the actual Function will become ATLEAST_ONCE. Refer 
> to the following code, this scenario will not immediately ack.
> 
> JavaInstanceRunnable#run():Line273  
> 
> if (instanceConfig.getFunctionDetails().getProcessingGuarantees() == 
> org.apache.pulsar.functions
> .proto.Function.ProcessingGuarantees.ATMOST_ONCE) {
> if (instanceConfig.getFunctionDetails().getAutoAck()) { // just when 
> autoAck == true to auto ack
> currentRecord.ack();
> }
> }
> 
> 2. If the user thinks that the framework doesn’t auto ack when autoAck == 
> false
> 
> According to the following code, the framework is still automatically acked.
> 
> PulsarSinkAtLeastOnceProcessor#sendOutputMessage():Line275 
> 
> PulsarSinkEffectivelyOnceProcessor#sendOutputMessage():Line325 
> 
> 
> public void sendOutputMessage(TypedMessageBuilder msg, SinkRecord 
> record) {
> msg.sendAsync()
> .thenAccept(messageId -> record.ack()) 
> .exceptionally(getPublishErrorHandler(record, true));
> }
> 
> To sum up, users may be confused when configuring Guarantees and autoAck, and 
> cannot judge their correct expected behavior.
> 
> I would like to discuss whether it is possible to cancel the autoAck 
> configuration and add a CUSTOM type for Guarantees.
> 
> switch (processingGuarantees) {
>   case Guarantees.ATMOST_ONCE: After the framework consumes the message, 
> it immediately acks
>   case Guarantees.ATLEAST_ONCE: After processing on the source side, 
> perform ack again
>   case Guarantees.EFFECTIVELY_ONCE: After processing on the source side, 
> perform ack again
>   case  Guarantees.CUSTOM: The function framework does not help users 
> with any ack operations and semantic guarantees
> }
> 
> If you have any ideas, welcome to discuss. If everyone agrees with this idea, 
> I will mention a PIP to promote implementation.
> 
> Thanks,
> Baodi Shi
> 
>