This is an automated email from the ASF dual-hosted git repository.

zongwen pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new 9725d675d [hotfix][pulsar] PulsarSource consumer ack exception. (#4237)
9725d675d is described below

commit 9725d675da00d06c7a6ac84ab658ab5db1ee0692
Author: lightzhao <[email protected]>
AuthorDate: Thu Mar 2 21:40:56 2023 +0800

    [hotfix][pulsar] PulsarSource consumer ack exception. (#4237)
    
    * fix pulsar source ack bug.
    
    * update doc.
    
    ---------
    
    Co-authored-by: lightzhao <[email protected]>
---
 .../connector-v2/Error-Quick-Reference-Manual.md   |  1 +
 .../pulsar/exception/PulsarConnectorErrorCode.java |  3 ++-
 .../pulsar/source/reader/PulsarSourceReader.java   | 31 +++++++++++++---------
 .../source/reader/PulsarSplitReaderThread.java     |  4 +--
 4 files changed, 24 insertions(+), 15 deletions(-)

diff --git a/docs/en/connector-v2/Error-Quick-Reference-Manual.md 
b/docs/en/connector-v2/Error-Quick-Reference-Manual.md
index a2b350743..8ba3120f0 100644
--- a/docs/en/connector-v2/Error-Quick-Reference-Manual.md
+++ b/docs/en/connector-v2/Error-Quick-Reference-Manual.md
@@ -208,6 +208,7 @@ problems encountered by users.
 | PULSAR-04 | Subscribe topic from pulsar failed               | When users 
encounter this error code, it means that Subscribe topic from pulsar failed, 
please check it               |
 | PULSAR-05 | Get last cursor of pulsar topic failed           | When users 
encounter this error code, it means that get last cursor of pulsar topic 
failed, please check it           |
 | PULSAR-06 | Get partition information of pulsar topic failed | When users 
encounter this error code, it means that Get partition information of pulsar 
topic failed, please check it |
+| PULSAR-07 | Pulsar consumer acknowledgeCumulative failed     | When users 
encounter this error code, it means that Pulsar consumer acknowledgeCumulative 
failed                      |
 
 ## StarRocks Connector Error Codes
 
diff --git 
a/seatunnel-connectors-v2/connector-pulsar/src/main/java/org/apache/seatunnel/connectors/seatunnel/pulsar/exception/PulsarConnectorErrorCode.java
 
b/seatunnel-connectors-v2/connector-pulsar/src/main/java/org/apache/seatunnel/connectors/seatunnel/pulsar/exception/PulsarConnectorErrorCode.java
index 7bc153c05..4f9871b49 100644
--- 
a/seatunnel-connectors-v2/connector-pulsar/src/main/java/org/apache/seatunnel/connectors/seatunnel/pulsar/exception/PulsarConnectorErrorCode.java
+++ 
b/seatunnel-connectors-v2/connector-pulsar/src/main/java/org/apache/seatunnel/connectors/seatunnel/pulsar/exception/PulsarConnectorErrorCode.java
@@ -25,7 +25,8 @@ public enum PulsarConnectorErrorCode implements 
SeaTunnelErrorCode {
     PULSAR_AUTHENTICATION_FAILED("PULSAR-03", "Pulsar authentication failed"),
     SUBSCRIBE_TOPIC_FAILED("PULSAR-04", "Subscribe topic from pulsar failed"),
     GET_LAST_CURSOR_FAILED("PULSAR-05", "Get last cursor of pulsar topic 
failed"),
-    GET_TOPIC_PARTITION_FAILED("PULSAR-06", "Get partition information of 
pulsar topic failed");
+    GET_TOPIC_PARTITION_FAILED("PULSAR-06", "Get partition information of 
pulsar topic failed"),
+    ACK_CUMULATE_FAILED("PULSAR-07", "Pulsar consumer acknowledgeCumulative 
failed");
 
     private final String code;
     private final String description;
diff --git 
a/seatunnel-connectors-v2/connector-pulsar/src/main/java/org/apache/seatunnel/connectors/seatunnel/pulsar/source/reader/PulsarSourceReader.java
 
b/seatunnel-connectors-v2/connector-pulsar/src/main/java/org/apache/seatunnel/connectors/seatunnel/pulsar/source/reader/PulsarSourceReader.java
index 9c986dd13..c008d4592 100644
--- 
a/seatunnel-connectors-v2/connector-pulsar/src/main/java/org/apache/seatunnel/connectors/seatunnel/pulsar/source/reader/PulsarSourceReader.java
+++ 
b/seatunnel-connectors-v2/connector-pulsar/src/main/java/org/apache/seatunnel/connectors/seatunnel/pulsar/source/reader/PulsarSourceReader.java
@@ -25,6 +25,7 @@ import org.apache.seatunnel.common.exception.CommonErrorCode;
 import 
org.apache.seatunnel.connectors.seatunnel.pulsar.config.PulsarClientConfig;
 import 
org.apache.seatunnel.connectors.seatunnel.pulsar.config.PulsarConfigUtil;
 import 
org.apache.seatunnel.connectors.seatunnel.pulsar.config.PulsarConsumerConfig;
+import 
org.apache.seatunnel.connectors.seatunnel.pulsar.exception.PulsarConnectorErrorCode;
 import 
org.apache.seatunnel.connectors.seatunnel.pulsar.exception.PulsarConnectorException;
 import 
org.apache.seatunnel.connectors.seatunnel.pulsar.source.enumerator.cursor.start.StartCursor;
 import 
org.apache.seatunnel.connectors.seatunnel.pulsar.source.split.PulsarPartitionSplit;
@@ -225,20 +226,26 @@ public class PulsarSourceReader<T> implements 
SourceReader<T, PulsarPartitionSpl
                     if (finishedSplits.contains(splitId)) {
                         return;
                     }
+                    try {
+                        splitReaders.get(splitId).committingCursor(messageId);
 
-                    splitReaders.get(splitId).committingCursor(messageId);
-
-                    if (pendingCursorsToFinish.containsKey(splitId)
-                            && 
pendingCursorsToFinish.get(splitId).compareTo(messageId) == 0) {
-                        finishedSplits.add(splitId);
-                        try {
-                            splitReaders.get(splitId).close();
-                        } catch (IOException e) {
-                            throw new PulsarConnectorException(
-                                    CommonErrorCode.READER_OPERATION_FAILED,
-                                    "Failed to close the split reader thread.",
-                                    e);
+                        if (pendingCursorsToFinish.containsKey(splitId)
+                                && 
pendingCursorsToFinish.get(splitId).compareTo(messageId) == 0) {
+                            finishedSplits.add(splitId);
+                            try {
+                                splitReaders.get(splitId).close();
+                            } catch (IOException e) {
+                                throw new PulsarConnectorException(
+                                        
CommonErrorCode.READER_OPERATION_FAILED,
+                                        "Failed to close the split reader 
thread.",
+                                        e);
+                            }
                         }
+                    } catch (PulsarClientException e) {
+                        throw new PulsarConnectorException(
+                                PulsarConnectorErrorCode.ACK_CUMULATE_FAILED,
+                                "pulsar consumer acknowledgeCumulative 
failed.",
+                                e);
                     }
                 });
     }
diff --git 
a/seatunnel-connectors-v2/connector-pulsar/src/main/java/org/apache/seatunnel/connectors/seatunnel/pulsar/source/reader/PulsarSplitReaderThread.java
 
b/seatunnel-connectors-v2/connector-pulsar/src/main/java/org/apache/seatunnel/connectors/seatunnel/pulsar/source/reader/PulsarSplitReaderThread.java
index d81064096..d97cc8e36 100644
--- 
a/seatunnel-connectors-v2/connector-pulsar/src/main/java/org/apache/seatunnel/connectors/seatunnel/pulsar/source/reader/PulsarSplitReaderThread.java
+++ 
b/seatunnel-connectors-v2/connector-pulsar/src/main/java/org/apache/seatunnel/connectors/seatunnel/pulsar/source/reader/PulsarSplitReaderThread.java
@@ -118,11 +118,11 @@ public class PulsarSplitReaderThread extends Thread 
implements Closeable {
         }
     }
 
-    public void committingCursor(MessageId offsetsToCommit) {
+    public void committingCursor(MessageId offsetsToCommit) throws 
PulsarClientException {
         if (consumer == null) {
             consumer = createPulsarConsumer(split);
         }
-        consumer.acknowledgeAsync(offsetsToCommit);
+        consumer.acknowledgeCumulative(offsetsToCommit);
     }
 
     /** Create a specified {@link Consumer} by the given split information. */

Reply via email to