This is an automated email from the ASF dual-hosted git repository.
lhotari pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new a5c1029668e [fix][fn] Fix Go function runtime to continue after user
exceptions and add neg-ack tests (#25867)
a5c1029668e is described below
commit a5c1029668e182b05579d223683d5e6ea82f84fe
Author: Dream95 <[email protected]>
AuthorDate: Mon May 25 20:28:29 2026 +0800
[fix][fn] Fix Go function runtime to continue after user exceptions and add
neg-ack tests (#25867)
Signed-off-by: Dream95 <[email protected]>
---
pulsar-function-go/pf/instance.go | 41 +++++++++++++++-------
pulsar-function-go/pf/instance_test.go | 32 +++++++++++++++++
.../docker-images/latest-version-image/Dockerfile | 1 +
.../go-examples/exceptionFunc/exceptionFunc.go | 41 ++++++++++++++++++++++
.../integration/functions/PulsarFunctionsTest.java | 4 +++
.../functions/PulsarFunctionsTestBase.java | 1 +
.../functions/go/PulsarFunctionsGoTest.java | 5 +++
7 files changed, 112 insertions(+), 13 deletions(-)
diff --git a/pulsar-function-go/pf/instance.go
b/pulsar-function-go/pf/instance.go
index af8a4e0157b..2cdfc8a6e94 100644
--- a/pulsar-function-go/pf/instance.go
+++ b/pulsar-function-go/pf/instance.go
@@ -164,7 +164,6 @@ CLOSE:
case cm := <-channel:
msgInput := cm.Message
atMostOnce :=
gi.context.instanceConf.funcDetails.ProcessingGuarantees ==
pb.ProcessingGuarantees_ATMOST_ONCE
- atLeastOnce :=
gi.context.instanceConf.funcDetails.ProcessingGuarantees ==
pb.ProcessingGuarantees_ATLEAST_ONCE
autoAck := gi.context.instanceConf.funcDetails.AutoAck
//nolint:staticcheck
if autoAck && atMostOnce {
gi.ackInputMessage(msgInput)
@@ -177,12 +176,8 @@ CLOSE:
output, err := gi.handlerMsg(msgInput)
if err != nil {
- log.Errorf("handler message error:%v", err)
- if autoAck && atLeastOnce {
- gi.nackInputMessage(msgInput)
- }
- gi.stats.incrTotalUserExceptions(err)
- return err
+ gi.handleUserError(msgInput, err)
+ continue
}
gi.stats.processTimeEnd()
@@ -391,6 +386,29 @@ func (gi *goInstance) setupConsumer() (chan
pulsar.ConsumerMessage, error) {
return channel, nil
}
+func (gi *goInstance) shouldNackInputOnFailure() bool {
+ guarantee := gi.context.instanceConf.funcDetails.ProcessingGuarantees
+ return guarantee == pb.ProcessingGuarantees_ATLEAST_ONCE ||
+ guarantee == pb.ProcessingGuarantees_MANUAL
+}
+
+func (gi *goInstance) handleUserError(msgInput pulsar.Message, err error) {
+ log.Errorf("handler message error:%v", err)
+ if gi.shouldNackInputOnFailure() {
+ gi.nackInputMessage(msgInput)
+ }
+ gi.stats.incrTotalUserExceptions(err)
+ gi.stats.processTimeEnd()
+}
+
+func (gi *goInstance) handlePublishError(msgInput pulsar.Message, err error) {
+ if gi.context.instanceConf.funcDetails.ProcessingGuarantees ==
pb.ProcessingGuarantees_ATLEAST_ONCE {
+ gi.nackInputMessage(msgInput)
+ }
+ gi.stats.incrTotalSysExceptions(err)
+ log.Errorf("failed to publish output message: %v", err)
+}
+
func (gi *goInstance) handlerMsg(input pulsar.Message) (output []byte, err
error) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
@@ -420,11 +438,8 @@ func (gi *goInstance) processResult(msgInput
pulsar.Message, output []byte) {
// semantics, ensure we nack so someone else
can get it, in case we are the only handler. Then mark
// exception and fail out.
if err != nil {
- if autoAck && atLeastOnce {
- gi.nackInputMessage(msgInput)
- }
- gi.stats.incrTotalSysExceptions(err)
- log.Fatal(err)
+ gi.handlePublishError(msgInput, err)
+ return
}
// Otherwise the message succeeded. If the SDK
is entrusted with responding and we are using
// atLeastOnce delivery semantics, ack the
message.
@@ -437,7 +452,7 @@ func (gi *goInstance) processResult(msgInput
pulsar.Message, output []byte) {
return
}
- // No output from the function or no output topic. Ack if we need to
and mark the success before rturning.
+ // No output from the function or no output topic. Ack if we need to
and mark the success before returning.
if autoAck && atLeastOnce {
gi.ackInputMessage(msgInput)
}
diff --git a/pulsar-function-go/pf/instance_test.go
b/pulsar-function-go/pf/instance_test.go
index bf45ae3a891..447d3a22f8d 100644
--- a/pulsar-function-go/pf/instance_test.go
+++ b/pulsar-function-go/pf/instance_test.go
@@ -27,6 +27,8 @@ import (
"time"
"github.com/stretchr/testify/assert"
+
+ pb "github.com/apache/pulsar/pulsar-function-go/pb"
)
func testProcessSpawnerHealthCheckTimer(
@@ -115,3 +117,33 @@ func Test_goInstance_handlerMsg(t *testing.T) {
assert.Equal(t, "output", string(output))
assert.Equal(t, message, fc.record)
}
+
+func newTestGoInstance(guarantee pb.ProcessingGuarantees) *goInstance {
+ return &goInstance{
+ context: &FunctionContext{
+ instanceConf: &instanceConf{
+ funcDetails: pb.FunctionDetails{
+ ProcessingGuarantees: guarantee,
+ },
+ },
+ },
+ }
+}
+
+func TestShouldNackInputOnFailure(t *testing.T) {
+ tests := []struct {
+ name string
+ guarantee pb.ProcessingGuarantees
+ want bool
+ }{
+ {"atLeastOnce", pb.ProcessingGuarantees_ATLEAST_ONCE, true},
+ {"manual", pb.ProcessingGuarantees_MANUAL, true},
+ {"atMostOnce", pb.ProcessingGuarantees_ATMOST_ONCE, false},
+ }
+ for _, tt := range tests {
+ t.Run(tt.name, func(t *testing.T) {
+ instance := newTestGoInstance(tt.guarantee)
+ assert.Equal(t, tt.want,
instance.shouldNackInputOnFailure())
+ })
+ }
+}
diff --git a/tests/docker-images/latest-version-image/Dockerfile
b/tests/docker-images/latest-version-image/Dockerfile
index 19f5cd5e28d..4867bfc21c4 100644
--- a/tests/docker-images/latest-version-image/Dockerfile
+++ b/tests/docker-images/latest-version-image/Dockerfile
@@ -24,6 +24,7 @@ ARG GOLANG_IMAGE
FROM $GOLANG_IMAGE as pulsar-function-go
COPY target/pulsar-function-go/
/go/src/github.com/apache/pulsar/pulsar-function-go
+COPY go-examples/exceptionFunc/
/go/src/github.com/apache/pulsar/pulsar-function-go/examples/exceptionFunc/
RUN cd /go/src/github.com/apache/pulsar/pulsar-function-go && go install ./...
RUN cd /go/src/github.com/apache/pulsar/pulsar-function-go/pf && go install
RUN cd /go/src/github.com/apache/pulsar/pulsar-function-go/examples && go
install ./...
diff --git
a/tests/docker-images/latest-version-image/go-examples/exceptionFunc/exceptionFunc.go
b/tests/docker-images/latest-version-image/go-examples/exceptionFunc/exceptionFunc.go
new file mode 100644
index 00000000000..80ace6e21b9
--- /dev/null
+++
b/tests/docker-images/latest-version-image/go-examples/exceptionFunc/exceptionFunc.go
@@ -0,0 +1,41 @@
+//
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+//
+
+package main
+
+import (
+ "context"
+ "errors"
+
+ "github.com/apache/pulsar/pulsar-function-go/pf"
+)
+
+var i int
+
+func HandleException(ctx context.Context, in []byte) ([]byte, error) {
+ i++
+ if i%10 == 0 {
+ return nil, errors.New("test")
+ }
+ return []byte(string(in) + "!"), nil
+}
+
+func main() {
+ pf.Start(HandleException)
+}
diff --git
a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/PulsarFunctionsTest.java
b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/PulsarFunctionsTest.java
index 86dc71f16fb..419482cea97 100644
---
a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/PulsarFunctionsTest.java
+++
b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/PulsarFunctionsTest.java
@@ -410,6 +410,10 @@ public abstract class PulsarFunctionsTest extends
PulsarFunctionsTestBase {
submitFunction(
runtime, inputTopicName, outputTopicName, functionName,
EXCEPTION_FUNCTION_PYTHON_FILE,
EXCEPTION_PYTHON_CLASS, schema, null);
+ } else if (runtime == Runtime.GO) {
+ submitFunction(
+ runtime, inputTopicName, outputTopicName, functionName,
EXCEPTION_GO_FILE,
+ null, schema, null);
} else {
submitFunction(
runtime, inputTopicName, outputTopicName, functionName,
null, EXCEPTION_JAVA_CLASS, schema, null);
diff --git
a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/PulsarFunctionsTestBase.java
b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/PulsarFunctionsTestBase.java
index 890b0eab0d8..c041f1bcf73 100644
---
a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/PulsarFunctionsTestBase.java
+++
b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/PulsarFunctionsTestBase.java
@@ -78,6 +78,7 @@ public abstract class PulsarFunctionsTestBase extends
PulsarTestSuite {
public static final String EXCLAMATION_GO_FILE = "exclamationFunc";
public static final String PUBLISH_FUNCTION_GO_FILE = "exclamationFunc";
+ public static final String EXCEPTION_GO_FILE = "exceptionFunc";
public static final String LOGGING_JAVA_CLASS =
"org.apache.pulsar.functions.api.examples.LoggingFunction";
diff --git
a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/go/PulsarFunctionsGoTest.java
b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/go/PulsarFunctionsGoTest.java
index 0550fd94ebe..6e631adafbf 100644
---
a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/go/PulsarFunctionsGoTest.java
+++
b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/go/PulsarFunctionsGoTest.java
@@ -39,4 +39,9 @@ public abstract class PulsarFunctionsGoTest extends
PulsarFunctionsTest {
testExclamationFunction(Runtime.GO, false, false, true, false);
}
+ @Test(groups = {"go_function", "function"})
+ public void testGoFunctionNegAck() throws Exception {
+ testFunctionNegAck(Runtime.GO);
+ }
+
}