This is an automated email from the ASF dual-hosted git repository. tison pushed a commit to branch release-1.14 in repository https://gitbox.apache.org/repos/asf/flink.git
commit e12e0386a1f06c14a377ed157cd32c2a3fae6cca Author: Yufan Sheng <[email protected]> AuthorDate: Mon Sep 5 23:59:52 2022 +0800 [FLINK-27611][Connector/pulsar] Fix ConcurrentModificationException during checkpoint on Pulsar unordered reader. --- .../reader/source/PulsarUnorderedSourceReader.java | 26 ++++++++++++---------- 1 file changed, 14 insertions(+), 12 deletions(-) diff --git a/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/reader/source/PulsarUnorderedSourceReader.java b/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/reader/source/PulsarUnorderedSourceReader.java index cf6de503d74..77366c2c842 100644 --- a/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/reader/source/PulsarUnorderedSourceReader.java +++ b/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/reader/source/PulsarUnorderedSourceReader.java @@ -47,6 +47,8 @@ import java.util.SortedMap; import java.util.TreeMap; import java.util.function.Supplier; +import static java.util.stream.Collectors.toList; + /** * The source reader for pulsar subscription Shared and Key_Shared, which consumes the unordered * messages. @@ -162,19 +164,19 @@ public class PulsarUnorderedSourceReader<OUT> extends PulsarSourceReaderBase<OUT LOG.debug("Committing transactions for checkpoint {}", checkpointId); if (coordinatorClient != null) { - for (Map.Entry<Long, List<TxnID>> entry : transactionsToCommit.entrySet()) { - Long currentCheckpointId = entry.getKey(); - if (currentCheckpointId > checkpointId) { - continue; - } - - List<TxnID> transactions = entry.getValue(); - for (TxnID transaction : transactions) { - coordinatorClient.commit(transaction); - transactionsOfFinishedSplits.remove(transaction); + List<Long> checkpointIds = + transactionsToCommit.keySet().stream() + .filter(id -> id <= checkpointId) + .collect(toList()); + + for (Long id : checkpointIds) { + List<TxnID> transactions = transactionsToCommit.remove(id); + if (transactions != null) { + for (TxnID transaction : transactions) { + coordinatorClient.commit(transaction); + transactionsOfFinishedSplits.remove(transaction); + } } - - transactionsToCommit.remove(currentCheckpointId); } } }
