This is an automated email from the ASF dual-hosted git repository.

corgy pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new 71b7f8c6c6 [Fix][Connector-v2][MongoDB-CDC] When starting multiple 
tasks to connect to different MongoDB services, only the first one takes effect 
(#10064)
71b7f8c6c6 is described below

commit 71b7f8c6c62c577dac0f6f6c10cb5ea09cbeb708
Author: Jast <[email protected]>
AuthorDate: Wed Nov 26 12:48:38 2025 +0800

    [Fix][Connector-v2][MongoDB-CDC] When starting multiple tasks to connect to 
different MongoDB services, only the first one takes effect (#10064)
---
 .../mongodb/internal/MongodbClientProvider.java    |  21 +-
 .../cdc/mongodb/source/dialect/MongodbDialect.java |  85 +++---
 .../source/fetch/MongodbFetchTaskContext.java      |  19 +-
 .../mongodb/source/fetch/MongodbScanFetchTask.java |  11 +-
 .../source/fetch/MongodbStreamFetchTask.java       |  10 +-
 .../seatunnel/cdc/mongodb/utils/MongodbUtils.java  |   2 +-
 .../test/java/mongodb/MongodbCDCMultiSourceIT.java | 298 +++++++++++++++++++++
 .../src/test/resources/mongodb_multi_source_a.conf |  52 ++++
 .../src/test/resources/mongodb_multi_source_b.conf |  52 ++++
 9 files changed, 482 insertions(+), 68 deletions(-)

diff --git 
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-mongodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mongodb/internal/MongodbClientProvider.java
 
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-mongodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mongodb/internal/MongodbClientProvider.java
index dc621ed9dc..a733de035a 100644
--- 
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-mongodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mongodb/internal/MongodbClientProvider.java
+++ 
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-mongodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mongodb/internal/MongodbClientProvider.java
@@ -28,18 +28,13 @@ import lombok.extern.slf4j.Slf4j;
 public enum MongodbClientProvider {
     INSTANCE;
 
-    private volatile MongoClient mongoClient;
-
-    public MongoClient getOrCreateMongoClient(MongodbSourceConfig 
sourceConfig) {
-        if (mongoClient == null) {
-            ConnectionString connectionString =
-                    new ConnectionString(sourceConfig.getConnectionString());
-            log.info(
-                    "Create and register mongo client {}@{}",
-                    connectionString.getUsername(),
-                    connectionString.getHosts());
-            mongoClient = MongoClients.create(connectionString);
-        }
-        return mongoClient;
+    public MongoClient createMongoClient(MongodbSourceConfig sourceConfig) {
+        ConnectionString connectionString =
+                new ConnectionString(sourceConfig.getConnectionString());
+        log.info(
+                "Creating new mongo client {}@{}",
+                connectionString.getUsername(),
+                connectionString.getHosts());
+        return MongoClients.create(connectionString);
     }
 }
diff --git 
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-mongodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mongodb/source/dialect/MongodbDialect.java
 
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-mongodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mongodb/source/dialect/MongodbDialect.java
index 25e463c17e..b820118086 100644
--- 
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-mongodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mongodb/source/dialect/MongodbDialect.java
+++ 
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-mongodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mongodb/source/dialect/MongodbDialect.java
@@ -39,8 +39,6 @@ import lombok.extern.slf4j.Slf4j;
 import javax.annotation.Nonnull;
 
 import java.util.List;
-import java.util.Map;
-import java.util.concurrent.ConcurrentHashMap;
 import java.util.stream.Collectors;
 
 import static 
org.apache.seatunnel.connectors.seatunnel.cdc.mongodb.config.MongodbSourceOptions.DIALECT_NAME;
@@ -56,9 +54,6 @@ import static 
org.apache.seatunnel.connectors.seatunnel.cdc.mongodb.utils.Mongod
 @Slf4j
 public class MongodbDialect implements DataSourceDialect<MongodbSourceConfig> {
 
-    private final Map<MongodbSourceConfig, 
CollectionDiscoveryUtils.CollectionDiscoveryInfo> cache =
-            new ConcurrentHashMap<>();
-
     @Override
     public String getName() {
         return DIALECT_NAME;
@@ -67,7 +62,7 @@ public class MongodbDialect implements 
DataSourceDialect<MongodbSourceConfig> {
     @Override
     public List<TableId> discoverDataCollections(MongodbSourceConfig 
sourceConfig) {
         CollectionDiscoveryUtils.CollectionDiscoveryInfo discoveryInfo =
-                discoverAndCacheDataCollections(sourceConfig);
+                discoverDataCollectionsInfo(sourceConfig);
         return discoveryInfo.getDiscoveredCollections().stream()
                 .map(TableId::parse)
                 .collect(Collectors.toList());
@@ -97,7 +92,7 @@ public class MongodbDialect implements 
DataSourceDialect<MongodbSourceConfig> {
     public FetchTask.Context createFetchTaskContext(
             SourceSplitBase sourceSplitBase, MongodbSourceConfig sourceConfig) 
{
         CollectionDiscoveryUtils.CollectionDiscoveryInfo discoveryInfo =
-                discoverAndCacheDataCollections(sourceConfig);
+                discoverDataCollectionsInfo(sourceConfig);
         ChangeStreamDescriptor changeStreamDescriptor =
                 getChangeStreamDescriptor(
                         sourceConfig,
@@ -106,48 +101,48 @@ public class MongodbDialect implements 
DataSourceDialect<MongodbSourceConfig> {
         return new MongodbFetchTaskContext(this, sourceConfig, 
changeStreamDescriptor);
     }
 
-    private CollectionDiscoveryUtils.CollectionDiscoveryInfo 
discoverAndCacheDataCollections(
+    private CollectionDiscoveryUtils.CollectionDiscoveryInfo 
discoverDataCollectionsInfo(
             MongodbSourceConfig sourceConfig) {
-        return cache.computeIfAbsent(
-                sourceConfig,
-                config -> {
-                    MongoClient mongoClient = createMongoClient(sourceConfig);
-                    List<String> discoveredDatabases =
-                            databaseNames(
-                                    mongoClient, 
databaseFilter(sourceConfig.getDatabaseList()));
-                    List<String> discoveredCollections =
-                            collectionNames(
-                                    mongoClient,
-                                    discoveredDatabases,
-                                    
collectionsFilter(sourceConfig.getCollectionList()));
-                    return new 
CollectionDiscoveryUtils.CollectionDiscoveryInfo(
-                            discoveredDatabases, discoveredCollections);
-                });
+        try (MongoClient mongoClient = createMongoClient(sourceConfig)) {
+            List<String> discoveredDatabases =
+                    databaseNames(mongoClient, 
databaseFilter(sourceConfig.getDatabaseList()));
+            List<String> discoveredCollections =
+                    collectionNames(
+                            mongoClient,
+                            discoveredDatabases,
+                            
collectionsFilter(sourceConfig.getCollectionList()));
+            log.debug("Closed temporary MongoClient used for collection 
discovery");
+            return new CollectionDiscoveryUtils.CollectionDiscoveryInfo(
+                    discoveredDatabases, discoveredCollections);
+        }
     }
 
     public ChangeStreamOffset displayCurrentOffset(MongodbSourceConfig 
sourceConfig) {
-        MongoClient mongoClient = createMongoClient(sourceConfig);
-        CollectionDiscoveryUtils.CollectionDiscoveryInfo discoveryInfo =
-                discoverAndCacheDataCollections(sourceConfig);
-        ChangeStreamDescriptor changeStreamDescriptor =
-                getChangeStreamDescriptor(
-                        sourceConfig,
-                        discoveryInfo.getDiscoveredDatabases(),
-                        discoveryInfo.getDiscoveredCollections());
-        BsonDocument startupResumeToken = getLatestResumeToken(mongoClient, 
changeStreamDescriptor);
-
-        ChangeStreamOffset changeStreamOffset;
-        if (startupResumeToken != null) {
-            changeStreamOffset = new ChangeStreamOffset(startupResumeToken);
-            log.info(
-                    "startup resume token={},change stream offset={}",
-                    startupResumeToken,
-                    changeStreamOffset);
-
-        } else {
-            changeStreamOffset = new 
ChangeStreamOffset(getCurrentClusterTime(mongoClient));
+        try (MongoClient mongoClient = createMongoClient(sourceConfig)) {
+            CollectionDiscoveryUtils.CollectionDiscoveryInfo discoveryInfo =
+                    discoverDataCollectionsInfo(sourceConfig);
+            ChangeStreamDescriptor changeStreamDescriptor =
+                    getChangeStreamDescriptor(
+                            sourceConfig,
+                            discoveryInfo.getDiscoveredDatabases(),
+                            discoveryInfo.getDiscoveredCollections());
+            BsonDocument startupResumeToken =
+                    getLatestResumeToken(mongoClient, changeStreamDescriptor);
+
+            ChangeStreamOffset changeStreamOffset;
+            if (startupResumeToken != null) {
+                changeStreamOffset = new 
ChangeStreamOffset(startupResumeToken);
+                log.info(
+                        "startup resume token={},change stream offset={}",
+                        startupResumeToken,
+                        changeStreamOffset);
+
+            } else {
+                changeStreamOffset = new 
ChangeStreamOffset(getCurrentClusterTime(mongoClient));
+            }
+
+            log.debug("Closed temporary MongoClient used for displaying 
current offset");
+            return changeStreamOffset;
         }
-
-        return changeStreamOffset;
     }
 }
diff --git 
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-mongodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mongodb/source/fetch/MongodbFetchTaskContext.java
 
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-mongodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mongodb/source/fetch/MongodbFetchTaskContext.java
index ef514104d6..bed0edc067 100644
--- 
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-mongodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mongodb/source/fetch/MongodbFetchTaskContext.java
+++ 
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-mongodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mongodb/source/fetch/MongodbFetchTaskContext.java
@@ -37,12 +37,14 @@ import org.bson.BsonString;
 import org.bson.BsonType;
 import org.bson.BsonValue;
 
+import com.mongodb.client.MongoClient;
 import com.mongodb.client.model.changestream.OperationType;
 import io.debezium.connector.base.ChangeEventQueue;
 import io.debezium.pipeline.DataChangeEvent;
 import io.debezium.relational.TableId;
 import io.debezium.relational.Tables;
 import io.debezium.util.LoggingContext;
+import lombok.extern.slf4j.Slf4j;
 
 import javax.annotation.Nonnull;
 
@@ -72,6 +74,7 @@ import static 
org.apache.seatunnel.connectors.seatunnel.cdc.mongodb.utils.Mongod
 import static 
org.apache.seatunnel.connectors.seatunnel.cdc.mongodb.utils.MongodbRecordUtils.getResumeToken;
 import static 
org.apache.seatunnel.connectors.seatunnel.cdc.mongodb.utils.MongodbUtils.createMongoClient;
 
+@Slf4j
 public class MongodbFetchTaskContext implements FetchTask.Context {
 
     private final MongodbDialect dialect;
@@ -79,6 +82,8 @@ public class MongodbFetchTaskContext implements 
FetchTask.Context {
     private final ChangeStreamDescriptor changeStreamDescriptor;
     private ChangeEventQueue<DataChangeEvent> changeEventQueue;
 
+    private final MongoClient mongoClient;
+
     public MongodbFetchTaskContext(
             MongodbDialect dialect,
             MongodbSourceConfig sourceConfig,
@@ -86,6 +91,7 @@ public class MongodbFetchTaskContext implements 
FetchTask.Context {
         this.dialect = dialect;
         this.sourceConfig = sourceConfig;
         this.changeStreamDescriptor = changeStreamDescriptor;
+        this.mongoClient = createMongoClient(sourceConfig);
     }
 
     public void configure(@Nonnull SourceSplitBase sourceSplitBase) {
@@ -126,6 +132,10 @@ public class MongodbFetchTaskContext implements 
FetchTask.Context {
         return changeEventQueue;
     }
 
+    public MongoClient getMongoClient() {
+        return mongoClient;
+    }
+
     @Override
     public TableId getTableId(SourceRecord record) {
         return MongodbRecordUtils.getTableId(record);
@@ -263,7 +273,12 @@ public class MongodbFetchTaskContext implements 
FetchTask.Context {
 
     @Override
     public void close() {
-        Runtime.getRuntime()
-                .addShutdownHook(new Thread(() -> 
createMongoClient(sourceConfig).close()));
+        if (mongoClient != null) {
+            try {
+                mongoClient.close();
+            } catch (Exception e) {
+                log.error("Failed to close MongoClient", e);
+            }
+        }
     }
 }
diff --git 
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-mongodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mongodb/source/fetch/MongodbScanFetchTask.java
 
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-mongodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mongodb/source/fetch/MongodbScanFetchTask.java
index 32c69c463c..73f0c3ed61 100644
--- 
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-mongodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mongodb/source/fetch/MongodbScanFetchTask.java
+++ 
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-mongodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mongodb/source/fetch/MongodbScanFetchTask.java
@@ -65,7 +65,6 @@ import static 
org.apache.seatunnel.connectors.seatunnel.cdc.mongodb.config.Mongo
 import static 
org.apache.seatunnel.connectors.seatunnel.cdc.mongodb.utils.MongodbRecordUtils.createPartitionMap;
 import static 
org.apache.seatunnel.connectors.seatunnel.cdc.mongodb.utils.MongodbRecordUtils.createSourceOffsetMap;
 import static 
org.apache.seatunnel.connectors.seatunnel.cdc.mongodb.utils.MongodbRecordUtils.createWatermarkPartitionMap;
-import static 
org.apache.seatunnel.connectors.seatunnel.cdc.mongodb.utils.MongodbUtils.createMongoClient;
 import static 
org.apache.seatunnel.connectors.seatunnel.cdc.mongodb.utils.MongodbUtils.getMongoCollection;
 
 @Slf4j
@@ -102,7 +101,9 @@ public class MongodbScanFetchTask implements 
FetchTask<SourceSplitBase> {
                                 lowWatermark)));
 
         log.info("Snapshot step 2 - Snapshotting data");
-        try (MongoCursor<RawBsonDocument> cursor = 
getSnapshotCursor(snapshotSplit, sourceConfig)) {
+        MongoClient mongoClient = taskContext.getMongoClient();
+        try (MongoCursor<RawBsonDocument> cursor =
+                getSnapshotCursor(snapshotSplit, sourceConfig, mongoClient)) {
             while (cursor.hasNext()) {
                 checkTaskRunning();
                 BsonDocument valueDocument = 
normalizeSnapshotDocument(collectionId, cursor.next());
@@ -163,8 +164,9 @@ public class MongodbScanFetchTask implements 
FetchTask<SourceSplitBase> {
 
     @Nonnull
     private MongoCursor<RawBsonDocument> getSnapshotCursor(
-            @Nonnull SnapshotSplit snapshotSplit, MongodbSourceConfig 
sourceConfig) {
-        MongoClient mongoClient = createMongoClient(sourceConfig);
+            @Nonnull SnapshotSplit snapshotSplit,
+            MongodbSourceConfig sourceConfig,
+            MongoClient mongoClient) {
         MongoCollection<RawBsonDocument> collection =
                 getMongoCollection(mongoClient, snapshotSplit.getTableId(), 
RawBsonDocument.class);
         BsonDocument startKey = (BsonDocument) 
snapshotSplit.getSplitStart()[1];
@@ -176,6 +178,7 @@ public class MongodbScanFetchTask implements 
FetchTask<SourceSplitBase> {
                 startKey,
                 endKey,
                 hint);
+
         return collection
                 .find()
                 .min(startKey)
diff --git 
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-mongodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mongodb/source/fetch/MongodbStreamFetchTask.java
 
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-mongodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mongodb/source/fetch/MongodbStreamFetchTask.java
index 0dfc91d826..1807f0a607 100644
--- 
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-mongodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mongodb/source/fetch/MongodbStreamFetchTask.java
+++ 
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-mongodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mongodb/source/fetch/MongodbStreamFetchTask.java
@@ -83,7 +83,6 @@ import static 
org.apache.seatunnel.connectors.seatunnel.cdc.mongodb.utils.Mongod
 import static 
org.apache.seatunnel.connectors.seatunnel.cdc.mongodb.utils.MongodbRecordUtils.createWatermarkPartitionMap;
 import static 
org.apache.seatunnel.connectors.seatunnel.cdc.mongodb.utils.MongodbRecordUtils.currentBsonTimestamp;
 import static 
org.apache.seatunnel.connectors.seatunnel.cdc.mongodb.utils.MongodbRecordUtils.getResumeToken;
-import static 
org.apache.seatunnel.connectors.seatunnel.cdc.mongodb.utils.MongodbUtils.createMongoClient;
 import static 
org.apache.seatunnel.connectors.seatunnel.cdc.mongodb.utils.MongodbUtils.getChangeStreamIterable;
 import static 
org.apache.seatunnel.connectors.seatunnel.cdc.mongodb.utils.MongodbUtils.getCurrentClusterTime;
 
@@ -94,6 +93,7 @@ public class MongodbStreamFetchTask implements 
FetchTask<SourceSplitBase> {
     private volatile boolean taskRunning = false;
 
     private MongodbSourceConfig sourceConfig;
+    private MongoClient mongoClient;
     private final Time time = new SystemTime();
     private boolean supportsStartAtOperationTime = true;
     private boolean supportsStartAfter = true;
@@ -110,7 +110,7 @@ public class MongodbStreamFetchTask implements 
FetchTask<SourceSplitBase> {
         ChangeStreamDescriptor descriptor = 
taskContext.getChangeStreamDescriptor();
         ChangeEventQueue<DataChangeEvent> queue = taskContext.getQueue();
 
-        MongoClient mongoClient = createMongoClient(sourceConfig);
+        this.mongoClient = taskContext.getMongoClient();
         MongoChangeStreamCursor<BsonDocument> changeStreamCursor =
                 openChangeStreamCursor(descriptor);
         HeartbeatManager heartbeatManager = 
openHeartbeatManagerIfNeeded(changeStreamCursor);
@@ -260,7 +260,11 @@ public class MongodbStreamFetchTask implements 
FetchTask<SourceSplitBase> {
                 new 
ChangeStreamOffset(streamSplit.getStartupOffset().getOffset());
 
         ChangeStreamIterable<Document> changeStreamIterable =
-                getChangeStreamIterable(sourceConfig, changeStreamDescriptor);
+                getChangeStreamIterable(
+                        mongoClient,
+                        changeStreamDescriptor,
+                        sourceConfig.getBatchSize(),
+                        sourceConfig.isUpdateLookup());
 
         BsonDocument resumeToken = offset.getResumeToken();
         BsonTimestamp timestamp = offset.getTimestamp();
diff --git 
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-mongodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mongodb/utils/MongodbUtils.java
 
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-mongodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mongodb/utils/MongodbUtils.java
index 4bac103032..923fe330f1 100644
--- 
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-mongodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mongodb/utils/MongodbUtils.java
+++ 
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-mongodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mongodb/utils/MongodbUtils.java
@@ -377,7 +377,7 @@ public class MongodbUtils {
     }
 
     public static MongoClient createMongoClient(MongodbSourceConfig 
sourceConfig) {
-        return 
MongodbClientProvider.INSTANCE.getOrCreateMongoClient(sourceConfig);
+        return MongodbClientProvider.INSTANCE.createMongoClient(sourceConfig);
     }
 
     public static @Nonnull ConnectionString buildConnectionString(
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-mongodb-e2e/src/test/java/mongodb/MongodbCDCMultiSourceIT.java
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-mongodb-e2e/src/test/java/mongodb/MongodbCDCMultiSourceIT.java
new file mode 100644
index 0000000000..840cab7f5d
--- /dev/null
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-mongodb-e2e/src/test/java/mongodb/MongodbCDCMultiSourceIT.java
@@ -0,0 +1,298 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package mongodb;
+
+import 
org.apache.seatunnel.connectors.seatunnel.cdc.mysql.testutils.MySqlContainer;
+import 
org.apache.seatunnel.connectors.seatunnel.cdc.mysql.testutils.MySqlVersion;
+import 
org.apache.seatunnel.connectors.seatunnel.cdc.mysql.testutils.UniqueDatabase;
+import org.apache.seatunnel.e2e.common.TestResource;
+import org.apache.seatunnel.e2e.common.TestSuiteBase;
+import org.apache.seatunnel.e2e.common.container.ContainerExtendedFactory;
+import org.apache.seatunnel.e2e.common.container.EngineType;
+import org.apache.seatunnel.e2e.common.container.TestContainer;
+import org.apache.seatunnel.e2e.common.junit.DisabledOnContainer;
+import org.apache.seatunnel.e2e.common.junit.TestContainerExtension;
+
+import org.bson.Document;
+import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.TestTemplate;
+import org.testcontainers.containers.Container;
+import org.testcontainers.containers.output.Slf4jLogConsumer;
+import org.testcontainers.lifecycle.Startables;
+import org.testcontainers.utility.DockerLoggerFactory;
+
+import com.mongodb.client.MongoClient;
+import com.mongodb.client.MongoClients;
+import com.mongodb.client.MongoCollection;
+import lombok.extern.slf4j.Slf4j;
+
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Stream;
+
+import static org.testcontainers.shaded.org.awaitility.Awaitility.with;
+import static org.testcontainers.shaded.org.awaitility.Durations.TWO_SECONDS;
+
+@Slf4j
+@DisabledOnContainer(
+        value = {},
+        type = {EngineType.SPARK},
+        disabledReason = "Currently SPARK do not support cdc")
+public class MongodbCDCMultiSourceIT extends TestSuiteBase implements 
TestResource {
+
+    protected static final String MONGODB_DATABASE_A = "inventory_a";
+    protected static final String MONGODB_COLLECTION_A = "products_a";
+    protected MongoDBContainer mongodbContainerA;
+    protected MongoClient clientA;
+
+    protected static final String MONGODB_DATABASE_B = "inventory_b";
+    protected static final String MONGODB_COLLECTION_B = "products_b";
+    protected MongoDBContainer mongodbContainerB;
+    protected MongoClient clientB;
+
+    private static final String MYSQL_HOST = "mysql_e2e";
+    private static final String MYSQL_USER_NAME = "st_user";
+    private static final String MYSQL_USER_PASSWORD = "seatunnel";
+    private static final String MYSQL_DATABASE = "mongodb_cdc";
+    private static final String MYSQL_DRIVER_JAR =
+            
"https://repo1.maven.org/maven2/mysql/mysql-connector-java/8.0.16/mysql-connector-java-8.0.16.jar";;
+
+    private static final MySqlContainer MYSQL_CONTAINER = 
createMySqlContainer();
+    private final UniqueDatabase database = new 
UniqueDatabase(MYSQL_CONTAINER, MYSQL_DATABASE);
+
+    private static MySqlContainer createMySqlContainer() {
+        MySqlContainer mySqlContainer = new MySqlContainer(MySqlVersion.V8_0);
+        mySqlContainer.withNetwork(NETWORK);
+        mySqlContainer.withNetworkAliases(MYSQL_HOST);
+        mySqlContainer.withDatabaseName(MYSQL_DATABASE);
+        mySqlContainer.withUsername(MYSQL_USER_NAME);
+        mySqlContainer.withPassword(MYSQL_USER_PASSWORD);
+        mySqlContainer.withLogConsumer(
+                new 
Slf4jLogConsumer(DockerLoggerFactory.getLogger("Mysql-Docker-Image")));
+        mySqlContainer.setPortBindings(Collections.singletonList("3310:3306"));
+        return mySqlContainer;
+    }
+
+    @TestContainerExtension
+    private final ContainerExtendedFactory extendedFactory =
+            container -> {
+                Container.ExecResult extraCommands =
+                        container.execInContainer(
+                                "bash",
+                                "-c",
+                                "mkdir -p /tmp/seatunnel/plugins/Jdbc/lib && 
cd /tmp/seatunnel/plugins/Jdbc/lib && wget "
+                                        + MYSQL_DRIVER_JAR);
+                Assertions.assertEquals(0, extraCommands.getExitCode(), 
extraCommands.getStderr());
+            };
+
+    @BeforeAll
+    @Override
+    public void startUp() throws Exception {
+        log.info("Starting MySQL container...");
+        Startables.deepStart(Stream.of(MYSQL_CONTAINER)).join();
+        log.info("MySQL container started");
+        database.createAndInitialize();
+        log.info("MySQL database initialized");
+
+        log.info("Starting MongoDB A container...");
+        mongodbContainerA =
+                new MongoDBContainer(NETWORK, 
MongoDBContainer.ShardingClusterRole.SHARD);
+        mongodbContainerA.withNetworkAliases("mongo0");
+        
mongodbContainerA.setPortBindings(Collections.singletonList("27017:27017"));
+        mongodbContainerA.withLogConsumer(
+                new 
Slf4jLogConsumer(DockerLoggerFactory.getLogger("MongoDB-A-Docker-Image")));
+        Startables.deepStart(Stream.of(mongodbContainerA)).join();
+        log.info("MongoDB A container started");
+
+        log.info("Starting MongoDB B container...");
+        mongodbContainerB =
+                new MongoDBContainer(NETWORK, 
MongoDBContainer.ShardingClusterRole.SHARD);
+        mongodbContainerB.withNetworkAliases("mongo1");
+        
mongodbContainerB.setPortBindings(Collections.singletonList("27018:27017"));
+        mongodbContainerB.withLogConsumer(
+                new 
Slf4jLogConsumer(DockerLoggerFactory.getLogger("MongoDB-B-Docker-Image")));
+        Startables.deepStart(Stream.of(mongodbContainerB)).join();
+        log.info("MongoDB B container started");
+
+        initMongoDBConnections();
+        initMongoDBData();
+    }
+
+    private void initMongoDBConnections() {
+        String ipAddressA = mongodbContainerA.getHost();
+        Integer portA = mongodbContainerA.getFirstMappedPort();
+        String urlA =
+                String.format(
+                        "mongodb://%s:%s@%s:%d/%s?authSource=admin",
+                        "superuser", "superpw", ipAddressA, portA, 
MONGODB_DATABASE_A);
+        clientA = MongoClients.create(urlA);
+        log.info("Connected to MongoDB A at {}:{}", ipAddressA, portA);
+
+        String ipAddressB = mongodbContainerB.getHost();
+        Integer portB = mongodbContainerB.getFirstMappedPort();
+        String urlB =
+                String.format(
+                        "mongodb://%s:%s@%s:%d/%s?authSource=admin",
+                        "superuser", "superpw", ipAddressB, portB, 
MONGODB_DATABASE_B);
+        clientB = MongoClients.create(urlB);
+        log.info("Connected to MongoDB B at {}:{}", ipAddressB, portB);
+    }
+
+    private void initMongoDBData() {
+        MongoCollection<Document> collectionA =
+                
clientA.getDatabase(MONGODB_DATABASE_A).getCollection(MONGODB_COLLECTION_A);
+        collectionA.deleteMany(new Document());
+        List<Document> dataA = new ArrayList<>();
+        dataA.add(new Document("_id", "A001").append("name", "Product 
A1").append("price", 100));
+        dataA.add(new Document("_id", "A002").append("name", "Product 
A2").append("price", 200));
+        dataA.add(new Document("_id", "A003").append("name", "Product 
A3").append("price", 300));
+        collectionA.insertMany(dataA);
+        log.info("Inserted {} documents into MongoDB A", dataA.size());
+
+        MongoCollection<Document> collectionB =
+                
clientB.getDatabase(MONGODB_DATABASE_B).getCollection(MONGODB_COLLECTION_B);
+        collectionB.deleteMany(new Document());
+        List<Document> dataB = new ArrayList<>();
+        dataB.add(new Document("_id", "B001").append("name", "Product 
B1").append("price", 150));
+        dataB.add(new Document("_id", "B002").append("name", "Product 
B2").append("price", 250));
+        dataB.add(new Document("_id", "B003").append("name", "Product 
B3").append("price", 350));
+        collectionB.insertMany(dataB);
+        log.info("Inserted {} documents into MongoDB B", dataB.size());
+    }
+
+    @TestTemplate
+    public void testMultipleMongoDBSourcesSequentially(TestContainer 
container) throws Exception {
+        createMySqlTables();
+        CompletableFuture.supplyAsync(
+                () -> {
+                    try {
+                        container.executeJob("/mongodb_multi_source_a.conf");
+                    } catch (Exception e) {
+                        log.error("MongoDB A job exception: " + 
e.getMessage());
+                        throw new RuntimeException(e);
+                    }
+                    return null;
+                });
+
+        assertMySqlHasData("products_a", 3);
+        log.info("MongoDB A data verified in MySQL");
+
+        CompletableFuture.supplyAsync(
+                () -> {
+                    try {
+                        container.executeJob("/mongodb_multi_source_b.conf");
+                    } catch (Exception e) {
+                        log.error("MongoDB B job exception: " + 
e.getMessage());
+                        throw new RuntimeException(e);
+                    }
+                    return null;
+                });
+
+        assertMySqlHasData("products_b", 3);
+        log.info("MongoDB B data verified in MySQL");
+    }
+
+    private void createMySqlTables() throws SQLException {
+        try (Connection connection = getJdbcConnection()) {
+            String createTableA =
+                    "CREATE TABLE IF NOT EXISTS products_a ("
+                            + "_id VARCHAR(255) PRIMARY KEY, "
+                            + "name VARCHAR(255), "
+                            + "price INT"
+                            + ")";
+            connection.createStatement().execute(createTableA);
+            log.info("Created table products_a");
+
+            String createTableB =
+                    "CREATE TABLE IF NOT EXISTS products_b ("
+                            + "_id VARCHAR(255) PRIMARY KEY, "
+                            + "name VARCHAR(255), "
+                            + "price INT"
+                            + ")";
+            connection.createStatement().execute(createTableB);
+            log.info("Created table products_b");
+        }
+    }
+
+    private void assertMySqlHasData(String tableName, int expectedCount) {
+        with().pollInterval(TWO_SECONDS)
+                .pollDelay(500, TimeUnit.MILLISECONDS)
+                .await()
+                .atMost(5, TimeUnit.MINUTES)
+                .untilAsserted(
+                        () -> {
+                            try (Connection connection = getJdbcConnection()) {
+                                String sql = String.format("SELECT COUNT(*) 
FROM %s", tableName);
+                                try (ResultSet rs =
+                                        
connection.createStatement().executeQuery(sql)) {
+                                    if (rs.next()) {
+                                        int count = rs.getInt(1);
+                                        log.info("Table {} has {} rows", 
tableName, count);
+                                        Assertions.assertEquals(
+                                                expectedCount,
+                                                count,
+                                                String.format(
+                                                        "Expected %d rows in 
%s but found %d",
+                                                        expectedCount, 
tableName, count));
+                                    }
+                                }
+                            } catch (SQLException e) {
+                                throw new RuntimeException(e);
+                            }
+                        });
+    }
+
+    private Connection getJdbcConnection() throws SQLException {
+        return DriverManager.getConnection(
+                MYSQL_CONTAINER.getJdbcUrl(),
+                MYSQL_CONTAINER.getUsername(),
+                MYSQL_CONTAINER.getPassword());
+    }
+
+    @AfterAll
+    @Override
+    public void tearDown() {
+        if (clientA != null) {
+            clientA.close();
+        }
+
+        if (clientB != null) {
+            clientB.close();
+        }
+
+        if (mongodbContainerA != null) {
+            mongodbContainerA.stop();
+        }
+
+        if (mongodbContainerB != null) {
+            mongodbContainerB.stop();
+        }
+
+        if (MYSQL_CONTAINER != null) {
+            MYSQL_CONTAINER.close();
+        }
+    }
+}
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-mongodb-e2e/src/test/resources/mongodb_multi_source_a.conf
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-mongodb-e2e/src/test/resources/mongodb_multi_source_a.conf
new file mode 100644
index 0000000000..f7c0c1d05b
--- /dev/null
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-mongodb-e2e/src/test/resources/mongodb_multi_source_a.conf
@@ -0,0 +1,52 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+env {
+  parallelism = 1
+  job.mode = "STREAMING"
+  checkpoint.interval = 5000
+}
+
+source {
+  MongoDB-CDC {
+    hosts = "mongo0:27017"
+    database = ["inventory_a"]
+    collection = ["inventory_a.products_a"]
+    username = superuser
+    password = superpw
+    schema = {
+      fields {
+        "_id": string
+        "name": string
+        "price": int
+      }
+    }
+  }
+}
+
+sink {
+  jdbc {
+    url = "jdbc:mysql://mysql_e2e:3306/mongodb_cdc"
+    driver = "com.mysql.cj.jdbc.Driver"
+    username = "st_user"
+    password = "seatunnel"
+    generate_sink_sql = true
+    database = mongodb_cdc
+    table = "products_a"
+    primary_keys = ["_id"]
+  }
+}
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-mongodb-e2e/src/test/resources/mongodb_multi_source_b.conf
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-mongodb-e2e/src/test/resources/mongodb_multi_source_b.conf
new file mode 100644
index 0000000000..d1f55dab0d
--- /dev/null
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-mongodb-e2e/src/test/resources/mongodb_multi_source_b.conf
@@ -0,0 +1,52 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+env {
+  parallelism = 1
+  job.mode = "STREAMING"
+  checkpoint.interval = 5000
+}
+
+source {
+  MongoDB-CDC {
+    hosts = "mongo1:27017"
+    database = ["inventory_b"]
+    collection = ["inventory_b.products_b"]
+    username = superuser
+    password = superpw
+    schema = {
+      fields {
+        "_id": string
+        "name": string
+        "price": int
+      }
+    }
+  }
+}
+
+sink {
+  jdbc {
+    url = "jdbc:mysql://mysql_e2e:3306/mongodb_cdc"
+    driver = "com.mysql.cj.jdbc.Driver"
+    username = "st_user"
+    password = "seatunnel"
+    generate_sink_sql = true
+    database = mongodb_cdc
+    table = "products_b"
+    primary_keys = ["_id"]
+  }
+}


Reply via email to