This is an automated email from the ASF dual-hosted git repository.

lhotari pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new a5c1029668e [fix][fn] Fix Go function runtime to continue after user 
exceptions and add neg-ack tests (#25867)
a5c1029668e is described below

commit a5c1029668e182b05579d223683d5e6ea82f84fe
Author: Dream95 <[email protected]>
AuthorDate: Mon May 25 20:28:29 2026 +0800

    [fix][fn] Fix Go function runtime to continue after user exceptions and add 
neg-ack tests (#25867)
    
    Signed-off-by: Dream95 <[email protected]>
---
 pulsar-function-go/pf/instance.go                  | 41 +++++++++++++++-------
 pulsar-function-go/pf/instance_test.go             | 32 +++++++++++++++++
 .../docker-images/latest-version-image/Dockerfile  |  1 +
 .../go-examples/exceptionFunc/exceptionFunc.go     | 41 ++++++++++++++++++++++
 .../integration/functions/PulsarFunctionsTest.java |  4 +++
 .../functions/PulsarFunctionsTestBase.java         |  1 +
 .../functions/go/PulsarFunctionsGoTest.java        |  5 +++
 7 files changed, 112 insertions(+), 13 deletions(-)

diff --git a/pulsar-function-go/pf/instance.go 
b/pulsar-function-go/pf/instance.go
index af8a4e0157b..2cdfc8a6e94 100644
--- a/pulsar-function-go/pf/instance.go
+++ b/pulsar-function-go/pf/instance.go
@@ -164,7 +164,6 @@ CLOSE:
                case cm := <-channel:
                        msgInput := cm.Message
                        atMostOnce := 
gi.context.instanceConf.funcDetails.ProcessingGuarantees == 
pb.ProcessingGuarantees_ATMOST_ONCE
-                       atLeastOnce := 
gi.context.instanceConf.funcDetails.ProcessingGuarantees == 
pb.ProcessingGuarantees_ATLEAST_ONCE
                        autoAck := gi.context.instanceConf.funcDetails.AutoAck 
//nolint:staticcheck
                        if autoAck && atMostOnce {
                                gi.ackInputMessage(msgInput)
@@ -177,12 +176,8 @@ CLOSE:
 
                        output, err := gi.handlerMsg(msgInput)
                        if err != nil {
-                               log.Errorf("handler message error:%v", err)
-                               if autoAck && atLeastOnce {
-                                       gi.nackInputMessage(msgInput)
-                               }
-                               gi.stats.incrTotalUserExceptions(err)
-                               return err
+                               gi.handleUserError(msgInput, err)
+                               continue
                        }
 
                        gi.stats.processTimeEnd()
@@ -391,6 +386,29 @@ func (gi *goInstance) setupConsumer() (chan 
pulsar.ConsumerMessage, error) {
        return channel, nil
 }
 
+func (gi *goInstance) shouldNackInputOnFailure() bool {
+       guarantee := gi.context.instanceConf.funcDetails.ProcessingGuarantees
+       return guarantee == pb.ProcessingGuarantees_ATLEAST_ONCE ||
+               guarantee == pb.ProcessingGuarantees_MANUAL
+}
+
+func (gi *goInstance) handleUserError(msgInput pulsar.Message, err error) {
+       log.Errorf("handler message error:%v", err)
+       if gi.shouldNackInputOnFailure() {
+               gi.nackInputMessage(msgInput)
+       }
+       gi.stats.incrTotalUserExceptions(err)
+       gi.stats.processTimeEnd()
+}
+
+func (gi *goInstance) handlePublishError(msgInput pulsar.Message, err error) {
+       if gi.context.instanceConf.funcDetails.ProcessingGuarantees == 
pb.ProcessingGuarantees_ATLEAST_ONCE {
+               gi.nackInputMessage(msgInput)
+       }
+       gi.stats.incrTotalSysExceptions(err)
+       log.Errorf("failed to publish output message: %v", err)
+}
+
 func (gi *goInstance) handlerMsg(input pulsar.Message) (output []byte, err 
error) {
        ctx, cancel := context.WithCancel(context.Background())
        defer cancel()
@@ -420,11 +438,8 @@ func (gi *goInstance) processResult(msgInput 
pulsar.Message, output []byte) {
                                // semantics, ensure we nack so someone else 
can get it, in case we are the only handler. Then mark
                                // exception and fail out.
                                if err != nil {
-                                       if autoAck && atLeastOnce {
-                                               gi.nackInputMessage(msgInput)
-                                       }
-                                       gi.stats.incrTotalSysExceptions(err)
-                                       log.Fatal(err)
+                                       gi.handlePublishError(msgInput, err)
+                                       return
                                }
                                // Otherwise the message succeeded. If the SDK 
is entrusted with responding and we are using
                                // atLeastOnce delivery semantics, ack the 
message.
@@ -437,7 +452,7 @@ func (gi *goInstance) processResult(msgInput 
pulsar.Message, output []byte) {
                return
        }
 
-       // No output from the function or no output topic. Ack if we need to 
and mark the success before rturning.
+       // No output from the function or no output topic. Ack if we need to 
and mark the success before returning.
        if autoAck && atLeastOnce {
                gi.ackInputMessage(msgInput)
        }
diff --git a/pulsar-function-go/pf/instance_test.go 
b/pulsar-function-go/pf/instance_test.go
index bf45ae3a891..447d3a22f8d 100644
--- a/pulsar-function-go/pf/instance_test.go
+++ b/pulsar-function-go/pf/instance_test.go
@@ -27,6 +27,8 @@ import (
        "time"
 
        "github.com/stretchr/testify/assert"
+
+       pb "github.com/apache/pulsar/pulsar-function-go/pb"
 )
 
 func testProcessSpawnerHealthCheckTimer(
@@ -115,3 +117,33 @@ func Test_goInstance_handlerMsg(t *testing.T) {
        assert.Equal(t, "output", string(output))
        assert.Equal(t, message, fc.record)
 }
+
+func newTestGoInstance(guarantee pb.ProcessingGuarantees) *goInstance {
+       return &goInstance{
+               context: &FunctionContext{
+                       instanceConf: &instanceConf{
+                               funcDetails: pb.FunctionDetails{
+                                       ProcessingGuarantees: guarantee,
+                               },
+                       },
+               },
+       }
+}
+
+func TestShouldNackInputOnFailure(t *testing.T) {
+       tests := []struct {
+               name      string
+               guarantee pb.ProcessingGuarantees
+               want      bool
+       }{
+               {"atLeastOnce", pb.ProcessingGuarantees_ATLEAST_ONCE, true},
+               {"manual", pb.ProcessingGuarantees_MANUAL, true},
+               {"atMostOnce", pb.ProcessingGuarantees_ATMOST_ONCE, false},
+       }
+       for _, tt := range tests {
+               t.Run(tt.name, func(t *testing.T) {
+                       instance := newTestGoInstance(tt.guarantee)
+                       assert.Equal(t, tt.want, 
instance.shouldNackInputOnFailure())
+               })
+       }
+}
diff --git a/tests/docker-images/latest-version-image/Dockerfile 
b/tests/docker-images/latest-version-image/Dockerfile
index 19f5cd5e28d..4867bfc21c4 100644
--- a/tests/docker-images/latest-version-image/Dockerfile
+++ b/tests/docker-images/latest-version-image/Dockerfile
@@ -24,6 +24,7 @@ ARG GOLANG_IMAGE
 FROM $GOLANG_IMAGE as pulsar-function-go
 
 COPY target/pulsar-function-go/ 
/go/src/github.com/apache/pulsar/pulsar-function-go
+COPY go-examples/exceptionFunc/ 
/go/src/github.com/apache/pulsar/pulsar-function-go/examples/exceptionFunc/
 RUN cd /go/src/github.com/apache/pulsar/pulsar-function-go && go install ./...
 RUN cd /go/src/github.com/apache/pulsar/pulsar-function-go/pf && go install
 RUN cd /go/src/github.com/apache/pulsar/pulsar-function-go/examples && go 
install ./...
diff --git 
a/tests/docker-images/latest-version-image/go-examples/exceptionFunc/exceptionFunc.go
 
b/tests/docker-images/latest-version-image/go-examples/exceptionFunc/exceptionFunc.go
new file mode 100644
index 00000000000..80ace6e21b9
--- /dev/null
+++ 
b/tests/docker-images/latest-version-image/go-examples/exceptionFunc/exceptionFunc.go
@@ -0,0 +1,41 @@
+//
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+//
+
+package main
+
+import (
+       "context"
+       "errors"
+
+       "github.com/apache/pulsar/pulsar-function-go/pf"
+)
+
+var i int
+
+func HandleException(ctx context.Context, in []byte) ([]byte, error) {
+       i++
+       if i%10 == 0 {
+               return nil, errors.New("test")
+       }
+       return []byte(string(in) + "!"), nil
+}
+
+func main() {
+       pf.Start(HandleException)
+}
diff --git 
a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/PulsarFunctionsTest.java
 
b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/PulsarFunctionsTest.java
index 86dc71f16fb..419482cea97 100644
--- 
a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/PulsarFunctionsTest.java
+++ 
b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/PulsarFunctionsTest.java
@@ -410,6 +410,10 @@ public abstract class PulsarFunctionsTest extends 
PulsarFunctionsTestBase {
             submitFunction(
                     runtime, inputTopicName, outputTopicName, functionName, 
EXCEPTION_FUNCTION_PYTHON_FILE,
                     EXCEPTION_PYTHON_CLASS, schema, null);
+        } else if (runtime == Runtime.GO) {
+            submitFunction(
+                    runtime, inputTopicName, outputTopicName, functionName, 
EXCEPTION_GO_FILE,
+                    null, schema, null);
         } else {
             submitFunction(
                     runtime, inputTopicName, outputTopicName, functionName, 
null, EXCEPTION_JAVA_CLASS, schema, null);
diff --git 
a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/PulsarFunctionsTestBase.java
 
b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/PulsarFunctionsTestBase.java
index 890b0eab0d8..c041f1bcf73 100644
--- 
a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/PulsarFunctionsTestBase.java
+++ 
b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/PulsarFunctionsTestBase.java
@@ -78,6 +78,7 @@ public abstract class PulsarFunctionsTestBase extends 
PulsarTestSuite {
 
     public static final String EXCLAMATION_GO_FILE = "exclamationFunc";
     public static final String PUBLISH_FUNCTION_GO_FILE = "exclamationFunc";
+    public static final String EXCEPTION_GO_FILE = "exceptionFunc";
 
     public static final String LOGGING_JAVA_CLASS =
             "org.apache.pulsar.functions.api.examples.LoggingFunction";
diff --git 
a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/go/PulsarFunctionsGoTest.java
 
b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/go/PulsarFunctionsGoTest.java
index 0550fd94ebe..6e631adafbf 100644
--- 
a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/go/PulsarFunctionsGoTest.java
+++ 
b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/go/PulsarFunctionsGoTest.java
@@ -39,4 +39,9 @@ public abstract class PulsarFunctionsGoTest extends 
PulsarFunctionsTest {
         testExclamationFunction(Runtime.GO, false, false, true, false);
     }
 
+    @Test(groups = {"go_function", "function"})
+    public void testGoFunctionNegAck() throws Exception {
+        testFunctionNegAck(Runtime.GO);
+    }
+
 }

Reply via email to