Thanks for this detailed discussion about processing guarantee and ack.
These two settings are together affecting the behavior of a running function.

One thing I want to clarify is: 
AUTO_ACK setting means if the function runtime will ack messages or not. 
("function runtime" here specifically refers to the JavaInstanceRunnable. If 
the ack happens inside a sink's implemented write method, it's not auto-ack). 

If AUTO_ACK is TRUE, then the JavaInstanceRunnable will be acking messages.
If AUTO_ACK is  FALSE, then the acking will be done by Sink implementation.

Now with this context, let's review your two scenarios:

> 1.If the user set Guarantees == ATMOST_ONCE and autoAck == false.
To be precise, the processing semantics is not ATLEAST_ONCE. It's actually left 
to the implemented Sink to decide which semantics it is. It can be ATMOST_ONCE, 

> 2. If the user thinks that the framework doesn’t auto ack when autoAck == 
> false
This behavior is actually correct based on our previous context.

A real problematic scenario here is when USER sets 
JavaInstanceRunnable can ack for use under these cases. So there should be some 
check to ban user submit function with such configs.

On 2022/05/09 09:02:12 Baozi wrote:
> Hi, guys:
> I found out that autoAck configuration in function framework now affects 
> Delivery semantics, and make it difficult for users to understand. Refer to 
> the following two scenarios.
> 1. If the user understands that the semantics of Guarantees shall prevail
> If the user set Guarantees == ATMOST_ONCE and autoAck == false. Then the 
> processing semantics of the actual Function will become ATLEAST_ONCE. Refer 
> to the following code, this scenario will not immediately ack.
> JavaInstanceRunnable#run():Line273  
> <>
> if (instanceConfig.getFunctionDetails().getProcessingGuarantees() == 
> org.apache.pulsar.functions
>         .proto.Function.ProcessingGuarantees.ATMOST_ONCE) {
>     if (instanceConfig.getFunctionDetails().getAutoAck()) { // just when 
> autoAck == true to auto ack
>         currentRecord.ack();
>     }
> }
> 2. If the user thinks that the framework doesn’t auto ack when autoAck == 
> false
> According to the following code, the framework is still automatically acked.
> PulsarSinkAtLeastOnceProcessor#sendOutputMessage():Line275 
> <>
> PulsarSinkEffectivelyOnceProcessor#sendOutputMessage():Line325 
> <>
> public void sendOutputMessage(TypedMessageBuilder<T> msg, SinkRecord<T> 
> record) {
>     msg.sendAsync()
>             .thenAccept(messageId -> record.ack()) 
>             .exceptionally(getPublishErrorHandler(record, true));
> }
> To sum up, users may be confused when configuring Guarantees and autoAck, and 
> cannot judge their correct expected behavior.
> I would like to discuss whether it is possible to cancel the autoAck 
> configuration and add a CUSTOM type for Guarantees.
> switch (processingGuarantees) {
>       case Guarantees.ATMOST_ONCE: After the framework consumes the message, 
> it immediately acks
>       case Guarantees.ATLEAST_ONCE: After processing on the source side, 
> perform ack again
>       case Guarantees.EFFECTIVELY_ONCE: After processing on the source side, 
> perform ack again
>       case  Guarantees.CUSTOM: The function framework does not help users 
> with any ack operations and semantic guarantees
> }
> If you have any ideas, welcome to discuss. If everyone agrees with this idea, 
> I will mention a PIP to promote implementation.
> Thanks,
> Baodi Shi

Reply via email to