> AUTO_ACK setting means if the function runtime will ack messages or not.
> ("function runtime" here specifically refers to the JavaInstanceRunnable. If
> the ack happens inside a sink's implemented write method, it's not auto-ack).
The description of the official website document is:Whether or not the
framework acknowledges messages automatically.
For users, sink is also part of the function framework.
Thanks,
Baodi Shi
> 2022年5月10日 09:1407,Baozi <[email protected]> 写道:
>
> Thanks for this detailed discussion about processing guarantee and ack.
> These two settings are together affecting the behavior of a running function.
>
> One thing I want to clarify is:
> AUTO_ACK setting means if the function runtime will ack messages or not.
> ("function runtime" here specifically refers to the JavaInstanceRunnable. If
> the ack happens inside a sink's implemented write method, it's not auto-ack).
>
> If AUTO_ACK is TRUE, then the JavaInstanceRunnable will be acking messages.
> If AUTO_ACK is FALSE, then the acking will be done by Sink implementation.
>
> Now with this context, let's review your two scenarios:
>
>> 1.If the user set Guarantees == ATMOST_ONCE and autoAck == false.
> To be precise, the processing semantics is not ATLEAST_ONCE. It's actually
> left to the implemented Sink to decide which semantics it is. It can be
> ATMOST_ONCE, ATLEAST_ONCE and probably EFFECTIVELLY_ONCE.
>
>> 2. If the user thinks that the framework doesn’t auto ack when autoAck ==
>> false
> This behavior is actually correct based on our previous context.
>
> A real problematic scenario here is when USER sets
> ATLEAST_ONCE/EFFECTIVELY_ONCE and AUTO_ACK=true. I don't think the
> JavaInstanceRunnable can ack for use under these cases. So there should be
> some check to ban user submit function with such configs.
>
>
>
> On 2022/05/09 09:02:12 Baozi wrote:
>> Hi, guys:
>>
>> I found out that autoAck configuration in function framework now affects
>> Delivery semantics, and make it difficult for users to understand. Refer to
>> the following two scenarios.
>>
>> 1. If the user understands that the semantics of Guarantees shall prevail
>>
>> If the user set Guarantees == ATMOST_ONCE and autoAck == false. Then the
>> processing semantics of the actual Function will become ATLEAST_ONCE. Refer
>> to the following code, this scenario will not immediately ack.
>>
>> JavaInstanceRunnable#run():Line273
>> <https://github.com/apache/pulsar/blob/c49a977de4b0b525ec80e5070bc90eddcc7cddad/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java#L271-L276
>>
>> <https://github.com/apache/pulsar/blob/c49a977de4b0b525ec80e5070bc90eddcc7cddad/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java#L271-L276>>
>> if (instanceConfig.getFunctionDetails().getProcessingGuarantees() ==
>> org.apache.pulsar.functions
>> .proto.Function.ProcessingGuarantees.ATMOST_ONCE) {
>> if (instanceConfig.getFunctionDetails().getAutoAck()) { // just when autoAck
>> == true to auto ack
>> currentRecord.ack();
>> }
>> }
>>
>> 2. If the user thinks that the framework doesn’t auto ack when autoAck ==
>> false
>>
>> According to the following code, the framework is still automatically acked.
>>
>> PulsarSinkAtLeastOnceProcessor#sendOutputMessage():Line275
>> <https://github.com/apache/pulsar/blob/c49a977de4b0b525ec80e5070bc90eddcc7cddad/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/sink/PulsarSink.java#L274-L276
>>
>> <https://github.com/apache/pulsar/blob/c49a977de4b0b525ec80e5070bc90eddcc7cddad/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/sink/PulsarSink.java#L274-L276>>
>> PulsarSinkEffectivelyOnceProcessor#sendOutputMessage():Line325
>> <https://github.com/apache/pulsar/blob/c49a977de4b0b525ec80e5070bc90eddcc7cddad/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/sink/PulsarSink.java#L325
>>
>> <https://github.com/apache/pulsar/blob/c49a977de4b0b525ec80e5070bc90eddcc7cddad/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/sink/PulsarSink.java#L325>>
>>
>> public void sendOutputMessage(TypedMessageBuilder<T> msg, SinkRecord<T>
>> record) {
>> msg.sendAsync()
>> .thenAccept(messageId -> record.ack())
>> .exceptionally(getPublishErrorHandler(record, true));
>> }
>>
>> To sum up, users may be confused when configuring Guarantees and autoAck,
>> and cannot judge their correct expected behavior.
>>
>> I would like to discuss whether it is possible to cancel the autoAck
>> configuration and add a CUSTOM type for Guarantees.
>>
>> switch (processingGuarantees) {
>> case Guarantees.ATMOST_ONCE: After the framework consumes the message,
>> it immediately acks
>> case Guarantees.ATLEAST_ONCE: After processing on the source side,
>> perform ack again
>> case Guarantees.EFFECTIVELY_ONCE: After processing on the source side,
>> perform ack again
>> case Guarantees.CUSTOM: The function framework does not help users with
>> any ack operations and semantic guarantees
>> }
>>
>> If you have any ideas, welcome to discuss. If everyone agrees with this
>> idea, I will mention a PIP to promote implementation.
>>
>> Thanks,
>> Baodi Shi