Thanks for this detailed discussion about processing guarantee and ack. These two settings are together affecting the behavior of a running function.
One thing I want to clarify is: AUTO_ACK setting means if the function runtime will ack messages or not. ("function runtime" here specifically refers to the JavaInstanceRunnable. If the ack happens inside a sink's implemented write method, it's not auto-ack). If AUTO_ACK is TRUE, then the JavaInstanceRunnable will be acking messages. If AUTO_ACK is FALSE, then the acking will be done by Sink implementation. Now with this context, let's review your two scenarios: > 1.If the user set Guarantees == ATMOST_ONCE and autoAck == false. To be precise, the processing semantics is not ATLEAST_ONCE. It's actually left to the implemented Sink to decide which semantics it is. It can be ATMOST_ONCE, ATLEAST_ONCE and probably EFFECTIVELLY_ONCE. > 2. If the user thinks that the framework doesn’t auto ack when autoAck == > false This behavior is actually correct based on our previous context. A real problematic scenario here is when USER sets ATLEAST_ONCE/EFFECTIVELY_ONCE and AUTO_ACK=true. I don't think the JavaInstanceRunnable can ack for use under these cases. So there should be some check to ban user submit function with such configs. On 2022/05/09 09:02:12 Baozi wrote: > Hi, guys: > > I found out that autoAck configuration in function framework now affects > Delivery semantics, and make it difficult for users to understand. Refer to > the following two scenarios. > > 1. If the user understands that the semantics of Guarantees shall prevail > > If the user set Guarantees == ATMOST_ONCE and autoAck == false. Then the > processing semantics of the actual Function will become ATLEAST_ONCE. Refer > to the following code, this scenario will not immediately ack. > > JavaInstanceRunnable#run():Line273 > <https://github.com/apache/pulsar/blob/c49a977de4b0b525ec80e5070bc90eddcc7cddad/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java#L271-L276> > if (instanceConfig.getFunctionDetails().getProcessingGuarantees() == > org.apache.pulsar.functions > .proto.Function.ProcessingGuarantees.ATMOST_ONCE) { > if (instanceConfig.getFunctionDetails().getAutoAck()) { // just when > autoAck == true to auto ack > currentRecord.ack(); > } > } > > 2. If the user thinks that the framework doesn’t auto ack when autoAck == > false > > According to the following code, the framework is still automatically acked. > > PulsarSinkAtLeastOnceProcessor#sendOutputMessage():Line275 > <https://github.com/apache/pulsar/blob/c49a977de4b0b525ec80e5070bc90eddcc7cddad/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/sink/PulsarSink.java#L274-L276> > PulsarSinkEffectivelyOnceProcessor#sendOutputMessage():Line325 > <https://github.com/apache/pulsar/blob/c49a977de4b0b525ec80e5070bc90eddcc7cddad/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/sink/PulsarSink.java#L325> > > public void sendOutputMessage(TypedMessageBuilder<T> msg, SinkRecord<T> > record) { > msg.sendAsync() > .thenAccept(messageId -> record.ack()) > .exceptionally(getPublishErrorHandler(record, true)); > } > > To sum up, users may be confused when configuring Guarantees and autoAck, and > cannot judge their correct expected behavior. > > I would like to discuss whether it is possible to cancel the autoAck > configuration and add a CUSTOM type for Guarantees. > > switch (processingGuarantees) { > case Guarantees.ATMOST_ONCE: After the framework consumes the message, > it immediately acks > case Guarantees.ATLEAST_ONCE: After processing on the source side, > perform ack again > case Guarantees.EFFECTIVELY_ONCE: After processing on the source side, > perform ack again > case Guarantees.CUSTOM: The function framework does not help users > with any ack operations and semantic guarantees > } > > If you have any ideas, welcome to discuss. If everyone agrees with this idea, > I will mention a PIP to promote implementation. > > Thanks, > Baodi Shi > >