This is an automated email from the ASF dual-hosted git repository.
tyrantlucifer pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new cd2bd6a40 [Hotfix][Connector-V2] Fix ConcurrentModificationException
when snapshotState based on SourceReaderBase (#4011)
cd2bd6a40 is described below
commit cd2bd6a40883d2e555f0ef3b5be2bb6dd495581a
Author: hailin0 <[email protected]>
AuthorDate: Wed Feb 1 10:08:42 2023 +0800
[Hotfix][Connector-V2] Fix ConcurrentModificationException when
snapshotState based on SourceReaderBase (#4011)
---
.../connectors/seatunnel/common/source/reader/SourceReaderBase.java | 6 ++++--
1 file changed, 4 insertions(+), 2 deletions(-)
diff --git
a/seatunnel-connectors-v2/connector-common/src/main/java/org/apache/seatunnel/connectors/seatunnel/common/source/reader/SourceReaderBase.java
b/seatunnel-connectors-v2/connector-common/src/main/java/org/apache/seatunnel/connectors/seatunnel/common/source/reader/SourceReaderBase.java
index 23dff321f..3f856e465 100644
---
a/seatunnel-connectors-v2/connector-common/src/main/java/org/apache/seatunnel/connectors/seatunnel/common/source/reader/SourceReaderBase.java
+++
b/seatunnel-connectors-v2/connector-common/src/main/java/org/apache/seatunnel/connectors/seatunnel/common/source/reader/SourceReaderBase.java
@@ -36,6 +36,8 @@ import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
/**
* An abstract implementation of {@link SourceReader} which provides some
synchronization between
@@ -55,7 +57,7 @@ import java.util.concurrent.BlockingQueue;
public abstract class SourceReaderBase<E, T, SplitT extends SourceSplit,
SplitStateT>
implements SourceReader<T, SplitT> {
private final BlockingQueue<RecordsWithSplitIds<E>> elementsQueue;
- private final Map<String, SplitContext<T, SplitStateT>> splitStates;
+ private final ConcurrentMap<String, SplitContext<T, SplitStateT>>
splitStates;
protected final RecordEmitter<E, T, SplitStateT> recordEmitter;
protected final SplitFetcherManager<E, SplitT> splitFetcherManager;
protected final SourceReaderOptions options;
@@ -74,7 +76,7 @@ public abstract class SourceReaderBase<E, T, SplitT extends
SourceSplit, SplitSt
this.elementsQueue = elementsQueue;
this.splitFetcherManager = splitFetcherManager;
this.recordEmitter = recordEmitter;
- this.splitStates = new HashMap<>();
+ this.splitStates = new ConcurrentHashMap<>();
this.options = options;
this.context = context;
}