> For users, sink is also part of the function framework.
^^ Is this written inside any Pulsar documentation? If you look the code
closely, the source and sink are actually configurable in Java runtime. User
can actually provide their own source/sink implementation.
On 2022/05/10 01:38:48 Baozi wrote:
> > AUTO_ACK setting means if the function runtime will ack messages or not.
> > ("function runtime" here specifically refers to the JavaInstanceRunnable.
> > If the ack happens inside a sink's implemented write method, it's not
> > auto-ack).
> The description of the official website document is:Whether or not the
> framework acknowledges messages automatically.
> For users, sink is also part of the function framework.
>
>
> Thanks,
> Baodi Shi
>
> > 2022年5月10日 09:1407,Baozi <[email protected]> 写道:
> >
> > Thanks for this detailed discussion about processing guarantee and ack.
> > These two settings are together affecting the behavior of a running
> > function.
> >
> > One thing I want to clarify is:
> > AUTO_ACK setting means if the function runtime will ack messages or not.
> > ("function runtime" here specifically refers to the JavaInstanceRunnable.
> > If the ack happens inside a sink's implemented write method, it's not
> > auto-ack).
> >
> > If AUTO_ACK is TRUE, then the JavaInstanceRunnable will be acking messages.
> > If AUTO_ACK is FALSE, then the acking will be done by Sink implementation.
> >
> > Now with this context, let's review your two scenarios:
> >
> >> 1.If the user set Guarantees == ATMOST_ONCE and autoAck == false.
> > To be precise, the processing semantics is not ATLEAST_ONCE. It's actually
> > left to the implemented Sink to decide which semantics it is. It can be
> > ATMOST_ONCE, ATLEAST_ONCE and probably EFFECTIVELLY_ONCE.
> >
> >> 2. If the user thinks that the framework doesn’t auto ack when autoAck ==
> >> false
> > This behavior is actually correct based on our previous context.
> >
> > A real problematic scenario here is when USER sets
> > ATLEAST_ONCE/EFFECTIVELY_ONCE and AUTO_ACK=true. I don't think the
> > JavaInstanceRunnable can ack for use under these cases. So there should be
> > some check to ban user submit function with such configs.
> >
> >
> >
> > On 2022/05/09 09:02:12 Baozi wrote:
> >> Hi, guys:
> >>
> >> I found out that autoAck configuration in function framework now affects
> >> Delivery semantics, and make it difficult for users to understand. Refer
> >> to the following two scenarios.
> >>
> >> 1. If the user understands that the semantics of Guarantees shall prevail
> >>
> >> If the user set Guarantees == ATMOST_ONCE and autoAck == false. Then the
> >> processing semantics of the actual Function will become ATLEAST_ONCE.
> >> Refer to the following code, this scenario will not immediately ack.
> >>
> >> JavaInstanceRunnable#run():Line273
> >> <https://github.com/apache/pulsar/blob/c49a977de4b0b525ec80e5070bc90eddcc7cddad/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java#L271-L276
> >>
> >> <https://github.com/apache/pulsar/blob/c49a977de4b0b525ec80e5070bc90eddcc7cddad/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java#L271-L276>>
> >> if (instanceConfig.getFunctionDetails().getProcessingGuarantees() ==
> >> org.apache.pulsar.functions
> >> .proto.Function.ProcessingGuarantees.ATMOST_ONCE) {
> >> if (instanceConfig.getFunctionDetails().getAutoAck()) { // just when
> >> autoAck == true to auto ack
> >> currentRecord.ack();
> >> }
> >> }
> >>
> >> 2. If the user thinks that the framework doesn’t auto ack when autoAck ==
> >> false
> >>
> >> According to the following code, the framework is still automatically
> >> acked.
> >>
> >> PulsarSinkAtLeastOnceProcessor#sendOutputMessage():Line275
> >> <https://github.com/apache/pulsar/blob/c49a977de4b0b525ec80e5070bc90eddcc7cddad/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/sink/PulsarSink.java#L274-L276
> >>
> >> <https://github.com/apache/pulsar/blob/c49a977de4b0b525ec80e5070bc90eddcc7cddad/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/sink/PulsarSink.java#L274-L276>>
> >> PulsarSinkEffectivelyOnceProcessor#sendOutputMessage():Line325
> >> <https://github.com/apache/pulsar/blob/c49a977de4b0b525ec80e5070bc90eddcc7cddad/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/sink/PulsarSink.java#L325
> >>
> >> <https://github.com/apache/pulsar/blob/c49a977de4b0b525ec80e5070bc90eddcc7cddad/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/sink/PulsarSink.java#L325>>
> >>
> >> public void sendOutputMessage(TypedMessageBuilder<T> msg, SinkRecord<T>
> >> record) {
> >> msg.sendAsync()
> >> .thenAccept(messageId -> record.ack())
> >> .exceptionally(getPublishErrorHandler(record, true));
> >> }
> >>
> >> To sum up, users may be confused when configuring Guarantees and autoAck,
> >> and cannot judge their correct expected behavior.
> >>
> >> I would like to discuss whether it is possible to cancel the autoAck
> >> configuration and add a CUSTOM type for Guarantees.
> >>
> >> switch (processingGuarantees) {
> >> case Guarantees.ATMOST_ONCE: After the framework consumes the message,
> >> it immediately acks
> >> case Guarantees.ATLEAST_ONCE: After processing on the source side,
> >> perform ack again
> >> case Guarantees.EFFECTIVELY_ONCE: After processing on the source side,
> >> perform ack again
> >> case Guarantees.CUSTOM: The function framework does not help users with
> >> any ack operations and semantic guarantees
> >> }
> >>
> >> If you have any ideas, welcome to discuss. If everyone agrees with this
> >> idea, I will mention a PIP to promote implementation.
> >>
> >> Thanks,
> >> Baodi Shi
>
>