This is an automated email from the ASF dual-hosted git repository.

baodi pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/branch-3.0 by this push:
     new 510ac3b230c [fix][fn] ack messages for window function when its result 
is null (#23618)
510ac3b230c is described below

commit 510ac3b230c29bc409cc0a6004aee545af31e3b2
Author: jiangpengcheng <[email protected]>
AuthorDate: Fri Nov 22 09:05:01 2024 +0800

    [fix][fn] ack messages for window function when its result is null (#23618)
    
    (cherry picked from commit 024ff7574b55104b2460c7969eb127577bfb54dc)
---
 .../functions/windowing/WindowFunctionExecutor.java  |  7 +++++++
 .../integration/functions/PulsarFunctionsTest.java   | 20 +++++++++++++++++++-
 2 files changed, 26 insertions(+), 1 deletion(-)

diff --git 
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/windowing/WindowFunctionExecutor.java
 
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/windowing/WindowFunctionExecutor.java
index c6ca4e65d33..1e492d74aa6 100644
--- 
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/windowing/WindowFunctionExecutor.java
+++ 
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/windowing/WindowFunctionExecutor.java
@@ -238,6 +238,13 @@ public class WindowFunctionExecutor<T, X> implements 
Function<T, X> {
                     }
                 }
             });
+        } else {
+            // When window function return null, needs to be acked directly.
+            if (windowConfig.getProcessingGuarantees() == 
WindowConfig.ProcessingGuarantees.ATLEAST_ONCE) {
+                for (Record<T> record : tuples) {
+                    record.ack();
+                }
+            }
         }
     }
 
diff --git 
a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/PulsarFunctionsTest.java
 
b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/PulsarFunctionsTest.java
index e6ba6acff83..9ecd46e777e 100644
--- 
a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/PulsarFunctionsTest.java
+++ 
b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/PulsarFunctionsTest.java
@@ -22,6 +22,7 @@ import static java.nio.charset.StandardCharsets.UTF_8;
 import static org.assertj.core.api.Assertions.assertThat;
 import static org.testng.Assert.assertEquals;
 import static org.testng.Assert.assertFalse;
+import static org.testng.Assert.assertNotNull;
 import static org.testng.Assert.assertTrue;
 import static org.testng.Assert.fail;
 import com.fasterxml.jackson.databind.JsonNode;
@@ -321,7 +322,17 @@ public abstract class PulsarFunctionsTest extends 
PulsarFunctionsTestBase {
                 .enableBatching(false)
                 .create();
 
-        for (int i = 0; i < NUM_OF_MESSAGES; i++) {
+        // send 3 messages first, and it won't trigger the window and so these 
3 messages will not be acked
+        for (int i = 0; i < 3; i++) {
+            producer.send(String.format("%d", i).getBytes());
+        }
+        TopicStats stats = pulsarAdmin.topics().getStats(inputTopicName, true);
+        SubscriptionStats subStats = 
stats.getSubscriptions().get("public/default/" + functionName);
+        assertNotNull(subStats);
+        assertEquals(3, subStats.getMsgBacklog());
+        assertEquals(3, subStats.getUnackedMessages());
+
+        for (int i = 3; i < NUM_OF_MESSAGES; i++) {
             producer.send(String.format("%d", i).getBytes());
         }
 
@@ -347,6 +358,13 @@ public abstract class PulsarFunctionsTest extends 
PulsarFunctionsTestBase {
         // in case last commit is not updated
         assertThat(i).isGreaterThanOrEqualTo(expectedResults.length - 1);
 
+        // test that all messages are acked
+        stats = pulsarAdmin.topics().getStats(inputTopicName, true);
+        subStats = stats.getSubscriptions().get("public/default/" + 
functionName);
+        assertNotNull(subStats);
+        assertEquals(0, subStats.getMsgBacklog());
+        assertEquals(0, subStats.getUnackedMessages());
+
         deleteFunction(functionName);
 
         getFunctionInfoNotFound(functionName);

Reply via email to