This is an automated email from the ASF dual-hosted git repository.

tison pushed a commit to branch release-1.16
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 4412ba7a13d82225f3082e5ee2f475f4be572d84
Author: Yufan Sheng <[email protected]>
AuthorDate: Mon Sep 5 23:59:52 2022 +0800

    [FLINK-27611][Connector/pulsar] Fix ConcurrentModificationException during 
checkpoint on Pulsar unordered reader.
---
 .../reader/source/PulsarUnorderedSourceReader.java | 26 ++++++++++++----------
 1 file changed, 14 insertions(+), 12 deletions(-)

diff --git 
a/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/reader/source/PulsarUnorderedSourceReader.java
 
b/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/reader/source/PulsarUnorderedSourceReader.java
index 09937d8ea05..b20b117652d 100644
--- 
a/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/reader/source/PulsarUnorderedSourceReader.java
+++ 
b/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/reader/source/PulsarUnorderedSourceReader.java
@@ -46,6 +46,8 @@ import java.util.SortedMap;
 import java.util.TreeMap;
 import java.util.function.Supplier;
 
+import static java.util.stream.Collectors.toList;
+
 /**
  * The source reader for pulsar subscription Shared and Key_Shared, which 
consumes the unordered
  * messages.
@@ -159,19 +161,19 @@ public class PulsarUnorderedSourceReader<OUT> extends 
PulsarSourceReaderBase<OUT
         LOG.debug("Committing transactions for checkpoint {}", checkpointId);
 
         if (coordinatorClient != null) {
-            for (Map.Entry<Long, List<TxnID>> entry : 
transactionsToCommit.entrySet()) {
-                Long currentCheckpointId = entry.getKey();
-                if (currentCheckpointId > checkpointId) {
-                    continue;
-                }
-
-                List<TxnID> transactions = entry.getValue();
-                for (TxnID transaction : transactions) {
-                    coordinatorClient.commit(transaction);
-                    transactionsOfFinishedSplits.remove(transaction);
+            List<Long> checkpointIds =
+                    transactionsToCommit.keySet().stream()
+                            .filter(id -> id <= checkpointId)
+                            .collect(toList());
+
+            for (Long id : checkpointIds) {
+                List<TxnID> transactions = transactionsToCommit.remove(id);
+                if (transactions != null) {
+                    for (TxnID transaction : transactions) {
+                        coordinatorClient.commit(transaction);
+                        transactionsOfFinishedSplits.remove(transaction);
+                    }
                 }
-
-                transactionsToCommit.remove(currentCheckpointId);
             }
         }
     }

Reply via email to