This is an automated email from the ASF dual-hosted git repository.

tison pushed a commit to branch release-1.15
in repository https://gitbox.apache.org/repos/asf/flink.git

commit e79d3b4c5af6117bd8ac38fe25be6c4ff41c0c76
Author: Yufan Sheng <[email protected]>
AuthorDate: Mon Sep 5 23:57:53 2022 +0800

    [FLINK-28084][Connector/pulsar] Disable retry and delete reconsume logic on 
PulsarUnorderedPartitionSplitReader.
---
 .../reader/split/PulsarUnorderedPartitionSplitReader.java    | 12 ------------
 1 file changed, 12 deletions(-)

diff --git 
a/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/reader/split/PulsarUnorderedPartitionSplitReader.java
 
b/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/reader/split/PulsarUnorderedPartitionSplitReader.java
index 1532044983f..73223acc9af 100644
--- 
a/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/reader/split/PulsarUnorderedPartitionSplitReader.java
+++ 
b/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/reader/split/PulsarUnorderedPartitionSplitReader.java
@@ -58,8 +58,6 @@ public class PulsarUnorderedPartitionSplitReader<OUT> extends 
PulsarPartitionSpl
     private static final Logger LOG =
             LoggerFactory.getLogger(PulsarUnorderedPartitionSplitReader.class);
 
-    private static final Duration REDELIVER_TIME = Duration.ofSeconds(3);
-
     private final TransactionCoordinatorClient coordinatorClient;
 
     @Nullable private Transaction uncommittedTransaction;
@@ -98,18 +96,8 @@ public class PulsarUnorderedPartitionSplitReader<OUT> 
extends PulsarPartitionSpl
                         .acknowledgeAsync(message.getMessageId(), 
uncommittedTransaction)
                         .get();
             } catch (InterruptedException e) {
-                sneakyClient(
-                        () ->
-                                pulsarConsumer.reconsumeLater(
-                                        message, REDELIVER_TIME.toMillis(), 
TimeUnit.MILLISECONDS));
                 Thread.currentThread().interrupt();
                 throw e;
-            } catch (ExecutionException e) {
-                sneakyClient(
-                        () ->
-                                pulsarConsumer.reconsumeLater(
-                                        message, REDELIVER_TIME.toMillis(), 
TimeUnit.MILLISECONDS));
-                throw e;
             }
         }
 

Reply via email to