This is an automated email from the ASF dual-hosted git repository.
baodi pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/branch-3.0 by this push:
new 510ac3b230c [fix][fn] ack messages for window function when its result
is null (#23618)
510ac3b230c is described below
commit 510ac3b230c29bc409cc0a6004aee545af31e3b2
Author: jiangpengcheng <[email protected]>
AuthorDate: Fri Nov 22 09:05:01 2024 +0800
[fix][fn] ack messages for window function when its result is null (#23618)
(cherry picked from commit 024ff7574b55104b2460c7969eb127577bfb54dc)
---
.../functions/windowing/WindowFunctionExecutor.java | 7 +++++++
.../integration/functions/PulsarFunctionsTest.java | 20 +++++++++++++++++++-
2 files changed, 26 insertions(+), 1 deletion(-)
diff --git
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/windowing/WindowFunctionExecutor.java
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/windowing/WindowFunctionExecutor.java
index c6ca4e65d33..1e492d74aa6 100644
---
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/windowing/WindowFunctionExecutor.java
+++
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/windowing/WindowFunctionExecutor.java
@@ -238,6 +238,13 @@ public class WindowFunctionExecutor<T, X> implements
Function<T, X> {
}
}
});
+ } else {
+ // When window function return null, needs to be acked directly.
+ if (windowConfig.getProcessingGuarantees() ==
WindowConfig.ProcessingGuarantees.ATLEAST_ONCE) {
+ for (Record<T> record : tuples) {
+ record.ack();
+ }
+ }
}
}
diff --git
a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/PulsarFunctionsTest.java
b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/PulsarFunctionsTest.java
index e6ba6acff83..9ecd46e777e 100644
---
a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/PulsarFunctionsTest.java
+++
b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/PulsarFunctionsTest.java
@@ -22,6 +22,7 @@ import static java.nio.charset.StandardCharsets.UTF_8;
import static org.assertj.core.api.Assertions.assertThat;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertFalse;
+import static org.testng.Assert.assertNotNull;
import static org.testng.Assert.assertTrue;
import static org.testng.Assert.fail;
import com.fasterxml.jackson.databind.JsonNode;
@@ -321,7 +322,17 @@ public abstract class PulsarFunctionsTest extends
PulsarFunctionsTestBase {
.enableBatching(false)
.create();
- for (int i = 0; i < NUM_OF_MESSAGES; i++) {
+ // send 3 messages first, and it won't trigger the window and so these
3 messages will not be acked
+ for (int i = 0; i < 3; i++) {
+ producer.send(String.format("%d", i).getBytes());
+ }
+ TopicStats stats = pulsarAdmin.topics().getStats(inputTopicName, true);
+ SubscriptionStats subStats =
stats.getSubscriptions().get("public/default/" + functionName);
+ assertNotNull(subStats);
+ assertEquals(3, subStats.getMsgBacklog());
+ assertEquals(3, subStats.getUnackedMessages());
+
+ for (int i = 3; i < NUM_OF_MESSAGES; i++) {
producer.send(String.format("%d", i).getBytes());
}
@@ -347,6 +358,13 @@ public abstract class PulsarFunctionsTest extends
PulsarFunctionsTestBase {
// in case last commit is not updated
assertThat(i).isGreaterThanOrEqualTo(expectedResults.length - 1);
+ // test that all messages are acked
+ stats = pulsarAdmin.topics().getStats(inputTopicName, true);
+ subStats = stats.getSubscriptions().get("public/default/" +
functionName);
+ assertNotNull(subStats);
+ assertEquals(0, subStats.getMsgBacklog());
+ assertEquals(0, subStats.getUnackedMessages());
+
deleteFunction(functionName);
getFunctionInfoNotFound(functionName);