Hi, guys:

I found out that autoAck configuration in function framework now affects 
Delivery semantics, and make it difficult for users to understand. Refer to the 
following two scenarios.

1. If the user understands that the semantics of Guarantees shall prevail

If the user set Guarantees == ATMOST_ONCE and autoAck == false. Then the 
processing semantics of the actual Function will become ATLEAST_ONCE. Refer to 
the following code, this scenario will not immediately ack.

JavaInstanceRunnable#run():Line273  
<https://github.com/apache/pulsar/blob/c49a977de4b0b525ec80e5070bc90eddcc7cddad/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java#L271-L276>
if (instanceConfig.getFunctionDetails().getProcessingGuarantees() == 
org.apache.pulsar.functions
        .proto.Function.ProcessingGuarantees.ATMOST_ONCE) {
    if (instanceConfig.getFunctionDetails().getAutoAck()) { // just when 
autoAck == true to auto ack
        currentRecord.ack();
    }
}

2. If the user thinks that the framework doesn’t auto ack when autoAck == false

According to the following code, the framework is still automatically acked.

PulsarSinkAtLeastOnceProcessor#sendOutputMessage():Line275 
<https://github.com/apache/pulsar/blob/c49a977de4b0b525ec80e5070bc90eddcc7cddad/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/sink/PulsarSink.java#L274-L276>
PulsarSinkEffectivelyOnceProcessor#sendOutputMessage():Line325 
<https://github.com/apache/pulsar/blob/c49a977de4b0b525ec80e5070bc90eddcc7cddad/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/sink/PulsarSink.java#L325>

public void sendOutputMessage(TypedMessageBuilder<T> msg, SinkRecord<T> record) 
{
    msg.sendAsync()
            .thenAccept(messageId -> record.ack()) 
            .exceptionally(getPublishErrorHandler(record, true));
}

To sum up, users may be confused when configuring Guarantees and autoAck, and 
cannot judge their correct expected behavior.

I would like to discuss whether it is possible to cancel the autoAck 
configuration and add a CUSTOM type for Guarantees.

switch (processingGuarantees) {
        case Guarantees.ATMOST_ONCE: After the framework consumes the message, 
it immediately acks
        case Guarantees.ATLEAST_ONCE: After processing on the source side, 
perform ack again
        case Guarantees.EFFECTIVELY_ONCE: After processing on the source side, 
perform ack again
        case  Guarantees.CUSTOM: The function framework does not help users 
with any ack operations and semantic guarantees
}

If you have any ideas, welcome to discuss. If everyone agrees with this idea, I 
will mention a PIP to promote implementation.

Thanks,
Baodi Shi

Reply via email to