This is an automated email from the ASF dual-hosted git repository. tison pushed a commit to branch release-1.14 in repository https://gitbox.apache.org/repos/asf/flink.git
commit 417be6a96c3e38ec1974d1a90bd66e06c8c5bc68 Author: Yufan Sheng <[email protected]> AuthorDate: Mon Sep 5 23:57:53 2022 +0800 [FLINK-28084][Connector/pulsar] Disable retry and delete reconsume logic on PulsarUnorderedPartitionSplitReader. --- .../reader/split/PulsarUnorderedPartitionSplitReader.java | 12 ------------ 1 file changed, 12 deletions(-) diff --git a/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/reader/split/PulsarUnorderedPartitionSplitReader.java b/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/reader/split/PulsarUnorderedPartitionSplitReader.java index 4f0d444061b..daf721c78db 100644 --- a/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/reader/split/PulsarUnorderedPartitionSplitReader.java +++ b/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/reader/split/PulsarUnorderedPartitionSplitReader.java @@ -59,8 +59,6 @@ public class PulsarUnorderedPartitionSplitReader<OUT> extends PulsarPartitionSpl private static final Logger LOG = LoggerFactory.getLogger(PulsarUnorderedPartitionSplitReader.class); - private static final Duration REDELIVER_TIME = Duration.ofSeconds(3); - private final TransactionCoordinatorClient coordinatorClient; @Nullable private Transaction uncommittedTransaction; @@ -100,18 +98,8 @@ public class PulsarUnorderedPartitionSplitReader<OUT> extends PulsarPartitionSpl .acknowledgeAsync(message.getMessageId(), uncommittedTransaction) .get(); } catch (InterruptedException e) { - sneakyClient( - () -> - pulsarConsumer.reconsumeLater( - message, REDELIVER_TIME.toMillis(), TimeUnit.MILLISECONDS)); Thread.currentThread().interrupt(); throw e; - } catch (ExecutionException e) { - sneakyClient( - () -> - pulsarConsumer.reconsumeLater( - message, REDELIVER_TIME.toMillis(), TimeUnit.MILLISECONDS)); - throw e; } }
