This is an automated email from the ASF dual-hosted git repository.
corgy pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new 71b7f8c6c6 [Fix][Connector-v2][MongoDB-CDC] When starting multiple
tasks to connect to different MongoDB services, only the first one takes effect
(#10064)
71b7f8c6c6 is described below
commit 71b7f8c6c62c577dac0f6f6c10cb5ea09cbeb708
Author: Jast <[email protected]>
AuthorDate: Wed Nov 26 12:48:38 2025 +0800
[Fix][Connector-v2][MongoDB-CDC] When starting multiple tasks to connect to
different MongoDB services, only the first one takes effect (#10064)
---
.../mongodb/internal/MongodbClientProvider.java | 21 +-
.../cdc/mongodb/source/dialect/MongodbDialect.java | 85 +++---
.../source/fetch/MongodbFetchTaskContext.java | 19 +-
.../mongodb/source/fetch/MongodbScanFetchTask.java | 11 +-
.../source/fetch/MongodbStreamFetchTask.java | 10 +-
.../seatunnel/cdc/mongodb/utils/MongodbUtils.java | 2 +-
.../test/java/mongodb/MongodbCDCMultiSourceIT.java | 298 +++++++++++++++++++++
.../src/test/resources/mongodb_multi_source_a.conf | 52 ++++
.../src/test/resources/mongodb_multi_source_b.conf | 52 ++++
9 files changed, 482 insertions(+), 68 deletions(-)
diff --git
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-mongodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mongodb/internal/MongodbClientProvider.java
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-mongodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mongodb/internal/MongodbClientProvider.java
index dc621ed9dc..a733de035a 100644
---
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-mongodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mongodb/internal/MongodbClientProvider.java
+++
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-mongodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mongodb/internal/MongodbClientProvider.java
@@ -28,18 +28,13 @@ import lombok.extern.slf4j.Slf4j;
public enum MongodbClientProvider {
INSTANCE;
- private volatile MongoClient mongoClient;
-
- public MongoClient getOrCreateMongoClient(MongodbSourceConfig
sourceConfig) {
- if (mongoClient == null) {
- ConnectionString connectionString =
- new ConnectionString(sourceConfig.getConnectionString());
- log.info(
- "Create and register mongo client {}@{}",
- connectionString.getUsername(),
- connectionString.getHosts());
- mongoClient = MongoClients.create(connectionString);
- }
- return mongoClient;
+ public MongoClient createMongoClient(MongodbSourceConfig sourceConfig) {
+ ConnectionString connectionString =
+ new ConnectionString(sourceConfig.getConnectionString());
+ log.info(
+ "Creating new mongo client {}@{}",
+ connectionString.getUsername(),
+ connectionString.getHosts());
+ return MongoClients.create(connectionString);
}
}
diff --git
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-mongodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mongodb/source/dialect/MongodbDialect.java
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-mongodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mongodb/source/dialect/MongodbDialect.java
index 25e463c17e..b820118086 100644
---
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-mongodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mongodb/source/dialect/MongodbDialect.java
+++
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-mongodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mongodb/source/dialect/MongodbDialect.java
@@ -39,8 +39,6 @@ import lombok.extern.slf4j.Slf4j;
import javax.annotation.Nonnull;
import java.util.List;
-import java.util.Map;
-import java.util.concurrent.ConcurrentHashMap;
import java.util.stream.Collectors;
import static
org.apache.seatunnel.connectors.seatunnel.cdc.mongodb.config.MongodbSourceOptions.DIALECT_NAME;
@@ -56,9 +54,6 @@ import static
org.apache.seatunnel.connectors.seatunnel.cdc.mongodb.utils.Mongod
@Slf4j
public class MongodbDialect implements DataSourceDialect<MongodbSourceConfig> {
- private final Map<MongodbSourceConfig,
CollectionDiscoveryUtils.CollectionDiscoveryInfo> cache =
- new ConcurrentHashMap<>();
-
@Override
public String getName() {
return DIALECT_NAME;
@@ -67,7 +62,7 @@ public class MongodbDialect implements
DataSourceDialect<MongodbSourceConfig> {
@Override
public List<TableId> discoverDataCollections(MongodbSourceConfig
sourceConfig) {
CollectionDiscoveryUtils.CollectionDiscoveryInfo discoveryInfo =
- discoverAndCacheDataCollections(sourceConfig);
+ discoverDataCollectionsInfo(sourceConfig);
return discoveryInfo.getDiscoveredCollections().stream()
.map(TableId::parse)
.collect(Collectors.toList());
@@ -97,7 +92,7 @@ public class MongodbDialect implements
DataSourceDialect<MongodbSourceConfig> {
public FetchTask.Context createFetchTaskContext(
SourceSplitBase sourceSplitBase, MongodbSourceConfig sourceConfig)
{
CollectionDiscoveryUtils.CollectionDiscoveryInfo discoveryInfo =
- discoverAndCacheDataCollections(sourceConfig);
+ discoverDataCollectionsInfo(sourceConfig);
ChangeStreamDescriptor changeStreamDescriptor =
getChangeStreamDescriptor(
sourceConfig,
@@ -106,48 +101,48 @@ public class MongodbDialect implements
DataSourceDialect<MongodbSourceConfig> {
return new MongodbFetchTaskContext(this, sourceConfig,
changeStreamDescriptor);
}
- private CollectionDiscoveryUtils.CollectionDiscoveryInfo
discoverAndCacheDataCollections(
+ private CollectionDiscoveryUtils.CollectionDiscoveryInfo
discoverDataCollectionsInfo(
MongodbSourceConfig sourceConfig) {
- return cache.computeIfAbsent(
- sourceConfig,
- config -> {
- MongoClient mongoClient = createMongoClient(sourceConfig);
- List<String> discoveredDatabases =
- databaseNames(
- mongoClient,
databaseFilter(sourceConfig.getDatabaseList()));
- List<String> discoveredCollections =
- collectionNames(
- mongoClient,
- discoveredDatabases,
-
collectionsFilter(sourceConfig.getCollectionList()));
- return new
CollectionDiscoveryUtils.CollectionDiscoveryInfo(
- discoveredDatabases, discoveredCollections);
- });
+ try (MongoClient mongoClient = createMongoClient(sourceConfig)) {
+ List<String> discoveredDatabases =
+ databaseNames(mongoClient,
databaseFilter(sourceConfig.getDatabaseList()));
+ List<String> discoveredCollections =
+ collectionNames(
+ mongoClient,
+ discoveredDatabases,
+
collectionsFilter(sourceConfig.getCollectionList()));
+ log.debug("Closed temporary MongoClient used for collection
discovery");
+ return new CollectionDiscoveryUtils.CollectionDiscoveryInfo(
+ discoveredDatabases, discoveredCollections);
+ }
}
public ChangeStreamOffset displayCurrentOffset(MongodbSourceConfig
sourceConfig) {
- MongoClient mongoClient = createMongoClient(sourceConfig);
- CollectionDiscoveryUtils.CollectionDiscoveryInfo discoveryInfo =
- discoverAndCacheDataCollections(sourceConfig);
- ChangeStreamDescriptor changeStreamDescriptor =
- getChangeStreamDescriptor(
- sourceConfig,
- discoveryInfo.getDiscoveredDatabases(),
- discoveryInfo.getDiscoveredCollections());
- BsonDocument startupResumeToken = getLatestResumeToken(mongoClient,
changeStreamDescriptor);
-
- ChangeStreamOffset changeStreamOffset;
- if (startupResumeToken != null) {
- changeStreamOffset = new ChangeStreamOffset(startupResumeToken);
- log.info(
- "startup resume token={},change stream offset={}",
- startupResumeToken,
- changeStreamOffset);
-
- } else {
- changeStreamOffset = new
ChangeStreamOffset(getCurrentClusterTime(mongoClient));
+ try (MongoClient mongoClient = createMongoClient(sourceConfig)) {
+ CollectionDiscoveryUtils.CollectionDiscoveryInfo discoveryInfo =
+ discoverDataCollectionsInfo(sourceConfig);
+ ChangeStreamDescriptor changeStreamDescriptor =
+ getChangeStreamDescriptor(
+ sourceConfig,
+ discoveryInfo.getDiscoveredDatabases(),
+ discoveryInfo.getDiscoveredCollections());
+ BsonDocument startupResumeToken =
+ getLatestResumeToken(mongoClient, changeStreamDescriptor);
+
+ ChangeStreamOffset changeStreamOffset;
+ if (startupResumeToken != null) {
+ changeStreamOffset = new
ChangeStreamOffset(startupResumeToken);
+ log.info(
+ "startup resume token={},change stream offset={}",
+ startupResumeToken,
+ changeStreamOffset);
+
+ } else {
+ changeStreamOffset = new
ChangeStreamOffset(getCurrentClusterTime(mongoClient));
+ }
+
+ log.debug("Closed temporary MongoClient used for displaying
current offset");
+ return changeStreamOffset;
}
-
- return changeStreamOffset;
}
}
diff --git
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-mongodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mongodb/source/fetch/MongodbFetchTaskContext.java
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-mongodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mongodb/source/fetch/MongodbFetchTaskContext.java
index ef514104d6..bed0edc067 100644
---
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-mongodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mongodb/source/fetch/MongodbFetchTaskContext.java
+++
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-mongodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mongodb/source/fetch/MongodbFetchTaskContext.java
@@ -37,12 +37,14 @@ import org.bson.BsonString;
import org.bson.BsonType;
import org.bson.BsonValue;
+import com.mongodb.client.MongoClient;
import com.mongodb.client.model.changestream.OperationType;
import io.debezium.connector.base.ChangeEventQueue;
import io.debezium.pipeline.DataChangeEvent;
import io.debezium.relational.TableId;
import io.debezium.relational.Tables;
import io.debezium.util.LoggingContext;
+import lombok.extern.slf4j.Slf4j;
import javax.annotation.Nonnull;
@@ -72,6 +74,7 @@ import static
org.apache.seatunnel.connectors.seatunnel.cdc.mongodb.utils.Mongod
import static
org.apache.seatunnel.connectors.seatunnel.cdc.mongodb.utils.MongodbRecordUtils.getResumeToken;
import static
org.apache.seatunnel.connectors.seatunnel.cdc.mongodb.utils.MongodbUtils.createMongoClient;
+@Slf4j
public class MongodbFetchTaskContext implements FetchTask.Context {
private final MongodbDialect dialect;
@@ -79,6 +82,8 @@ public class MongodbFetchTaskContext implements
FetchTask.Context {
private final ChangeStreamDescriptor changeStreamDescriptor;
private ChangeEventQueue<DataChangeEvent> changeEventQueue;
+ private final MongoClient mongoClient;
+
public MongodbFetchTaskContext(
MongodbDialect dialect,
MongodbSourceConfig sourceConfig,
@@ -86,6 +91,7 @@ public class MongodbFetchTaskContext implements
FetchTask.Context {
this.dialect = dialect;
this.sourceConfig = sourceConfig;
this.changeStreamDescriptor = changeStreamDescriptor;
+ this.mongoClient = createMongoClient(sourceConfig);
}
public void configure(@Nonnull SourceSplitBase sourceSplitBase) {
@@ -126,6 +132,10 @@ public class MongodbFetchTaskContext implements
FetchTask.Context {
return changeEventQueue;
}
+ public MongoClient getMongoClient() {
+ return mongoClient;
+ }
+
@Override
public TableId getTableId(SourceRecord record) {
return MongodbRecordUtils.getTableId(record);
@@ -263,7 +273,12 @@ public class MongodbFetchTaskContext implements
FetchTask.Context {
@Override
public void close() {
- Runtime.getRuntime()
- .addShutdownHook(new Thread(() ->
createMongoClient(sourceConfig).close()));
+ if (mongoClient != null) {
+ try {
+ mongoClient.close();
+ } catch (Exception e) {
+ log.error("Failed to close MongoClient", e);
+ }
+ }
}
}
diff --git
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-mongodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mongodb/source/fetch/MongodbScanFetchTask.java
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-mongodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mongodb/source/fetch/MongodbScanFetchTask.java
index 32c69c463c..73f0c3ed61 100644
---
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-mongodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mongodb/source/fetch/MongodbScanFetchTask.java
+++
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-mongodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mongodb/source/fetch/MongodbScanFetchTask.java
@@ -65,7 +65,6 @@ import static
org.apache.seatunnel.connectors.seatunnel.cdc.mongodb.config.Mongo
import static
org.apache.seatunnel.connectors.seatunnel.cdc.mongodb.utils.MongodbRecordUtils.createPartitionMap;
import static
org.apache.seatunnel.connectors.seatunnel.cdc.mongodb.utils.MongodbRecordUtils.createSourceOffsetMap;
import static
org.apache.seatunnel.connectors.seatunnel.cdc.mongodb.utils.MongodbRecordUtils.createWatermarkPartitionMap;
-import static
org.apache.seatunnel.connectors.seatunnel.cdc.mongodb.utils.MongodbUtils.createMongoClient;
import static
org.apache.seatunnel.connectors.seatunnel.cdc.mongodb.utils.MongodbUtils.getMongoCollection;
@Slf4j
@@ -102,7 +101,9 @@ public class MongodbScanFetchTask implements
FetchTask<SourceSplitBase> {
lowWatermark)));
log.info("Snapshot step 2 - Snapshotting data");
- try (MongoCursor<RawBsonDocument> cursor =
getSnapshotCursor(snapshotSplit, sourceConfig)) {
+ MongoClient mongoClient = taskContext.getMongoClient();
+ try (MongoCursor<RawBsonDocument> cursor =
+ getSnapshotCursor(snapshotSplit, sourceConfig, mongoClient)) {
while (cursor.hasNext()) {
checkTaskRunning();
BsonDocument valueDocument =
normalizeSnapshotDocument(collectionId, cursor.next());
@@ -163,8 +164,9 @@ public class MongodbScanFetchTask implements
FetchTask<SourceSplitBase> {
@Nonnull
private MongoCursor<RawBsonDocument> getSnapshotCursor(
- @Nonnull SnapshotSplit snapshotSplit, MongodbSourceConfig
sourceConfig) {
- MongoClient mongoClient = createMongoClient(sourceConfig);
+ @Nonnull SnapshotSplit snapshotSplit,
+ MongodbSourceConfig sourceConfig,
+ MongoClient mongoClient) {
MongoCollection<RawBsonDocument> collection =
getMongoCollection(mongoClient, snapshotSplit.getTableId(),
RawBsonDocument.class);
BsonDocument startKey = (BsonDocument)
snapshotSplit.getSplitStart()[1];
@@ -176,6 +178,7 @@ public class MongodbScanFetchTask implements
FetchTask<SourceSplitBase> {
startKey,
endKey,
hint);
+
return collection
.find()
.min(startKey)
diff --git
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-mongodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mongodb/source/fetch/MongodbStreamFetchTask.java
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-mongodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mongodb/source/fetch/MongodbStreamFetchTask.java
index 0dfc91d826..1807f0a607 100644
---
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-mongodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mongodb/source/fetch/MongodbStreamFetchTask.java
+++
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-mongodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mongodb/source/fetch/MongodbStreamFetchTask.java
@@ -83,7 +83,6 @@ import static
org.apache.seatunnel.connectors.seatunnel.cdc.mongodb.utils.Mongod
import static
org.apache.seatunnel.connectors.seatunnel.cdc.mongodb.utils.MongodbRecordUtils.createWatermarkPartitionMap;
import static
org.apache.seatunnel.connectors.seatunnel.cdc.mongodb.utils.MongodbRecordUtils.currentBsonTimestamp;
import static
org.apache.seatunnel.connectors.seatunnel.cdc.mongodb.utils.MongodbRecordUtils.getResumeToken;
-import static
org.apache.seatunnel.connectors.seatunnel.cdc.mongodb.utils.MongodbUtils.createMongoClient;
import static
org.apache.seatunnel.connectors.seatunnel.cdc.mongodb.utils.MongodbUtils.getChangeStreamIterable;
import static
org.apache.seatunnel.connectors.seatunnel.cdc.mongodb.utils.MongodbUtils.getCurrentClusterTime;
@@ -94,6 +93,7 @@ public class MongodbStreamFetchTask implements
FetchTask<SourceSplitBase> {
private volatile boolean taskRunning = false;
private MongodbSourceConfig sourceConfig;
+ private MongoClient mongoClient;
private final Time time = new SystemTime();
private boolean supportsStartAtOperationTime = true;
private boolean supportsStartAfter = true;
@@ -110,7 +110,7 @@ public class MongodbStreamFetchTask implements
FetchTask<SourceSplitBase> {
ChangeStreamDescriptor descriptor =
taskContext.getChangeStreamDescriptor();
ChangeEventQueue<DataChangeEvent> queue = taskContext.getQueue();
- MongoClient mongoClient = createMongoClient(sourceConfig);
+ this.mongoClient = taskContext.getMongoClient();
MongoChangeStreamCursor<BsonDocument> changeStreamCursor =
openChangeStreamCursor(descriptor);
HeartbeatManager heartbeatManager =
openHeartbeatManagerIfNeeded(changeStreamCursor);
@@ -260,7 +260,11 @@ public class MongodbStreamFetchTask implements
FetchTask<SourceSplitBase> {
new
ChangeStreamOffset(streamSplit.getStartupOffset().getOffset());
ChangeStreamIterable<Document> changeStreamIterable =
- getChangeStreamIterable(sourceConfig, changeStreamDescriptor);
+ getChangeStreamIterable(
+ mongoClient,
+ changeStreamDescriptor,
+ sourceConfig.getBatchSize(),
+ sourceConfig.isUpdateLookup());
BsonDocument resumeToken = offset.getResumeToken();
BsonTimestamp timestamp = offset.getTimestamp();
diff --git
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-mongodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mongodb/utils/MongodbUtils.java
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-mongodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mongodb/utils/MongodbUtils.java
index 4bac103032..923fe330f1 100644
---
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-mongodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mongodb/utils/MongodbUtils.java
+++
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-mongodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mongodb/utils/MongodbUtils.java
@@ -377,7 +377,7 @@ public class MongodbUtils {
}
public static MongoClient createMongoClient(MongodbSourceConfig
sourceConfig) {
- return
MongodbClientProvider.INSTANCE.getOrCreateMongoClient(sourceConfig);
+ return MongodbClientProvider.INSTANCE.createMongoClient(sourceConfig);
}
public static @Nonnull ConnectionString buildConnectionString(
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-mongodb-e2e/src/test/java/mongodb/MongodbCDCMultiSourceIT.java
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-mongodb-e2e/src/test/java/mongodb/MongodbCDCMultiSourceIT.java
new file mode 100644
index 0000000000..840cab7f5d
--- /dev/null
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-mongodb-e2e/src/test/java/mongodb/MongodbCDCMultiSourceIT.java
@@ -0,0 +1,298 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package mongodb;
+
+import
org.apache.seatunnel.connectors.seatunnel.cdc.mysql.testutils.MySqlContainer;
+import
org.apache.seatunnel.connectors.seatunnel.cdc.mysql.testutils.MySqlVersion;
+import
org.apache.seatunnel.connectors.seatunnel.cdc.mysql.testutils.UniqueDatabase;
+import org.apache.seatunnel.e2e.common.TestResource;
+import org.apache.seatunnel.e2e.common.TestSuiteBase;
+import org.apache.seatunnel.e2e.common.container.ContainerExtendedFactory;
+import org.apache.seatunnel.e2e.common.container.EngineType;
+import org.apache.seatunnel.e2e.common.container.TestContainer;
+import org.apache.seatunnel.e2e.common.junit.DisabledOnContainer;
+import org.apache.seatunnel.e2e.common.junit.TestContainerExtension;
+
+import org.bson.Document;
+import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.TestTemplate;
+import org.testcontainers.containers.Container;
+import org.testcontainers.containers.output.Slf4jLogConsumer;
+import org.testcontainers.lifecycle.Startables;
+import org.testcontainers.utility.DockerLoggerFactory;
+
+import com.mongodb.client.MongoClient;
+import com.mongodb.client.MongoClients;
+import com.mongodb.client.MongoCollection;
+import lombok.extern.slf4j.Slf4j;
+
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Stream;
+
+import static org.testcontainers.shaded.org.awaitility.Awaitility.with;
+import static org.testcontainers.shaded.org.awaitility.Durations.TWO_SECONDS;
+
+@Slf4j
+@DisabledOnContainer(
+ value = {},
+ type = {EngineType.SPARK},
+ disabledReason = "Currently SPARK do not support cdc")
+public class MongodbCDCMultiSourceIT extends TestSuiteBase implements
TestResource {
+
+ protected static final String MONGODB_DATABASE_A = "inventory_a";
+ protected static final String MONGODB_COLLECTION_A = "products_a";
+ protected MongoDBContainer mongodbContainerA;
+ protected MongoClient clientA;
+
+ protected static final String MONGODB_DATABASE_B = "inventory_b";
+ protected static final String MONGODB_COLLECTION_B = "products_b";
+ protected MongoDBContainer mongodbContainerB;
+ protected MongoClient clientB;
+
+ private static final String MYSQL_HOST = "mysql_e2e";
+ private static final String MYSQL_USER_NAME = "st_user";
+ private static final String MYSQL_USER_PASSWORD = "seatunnel";
+ private static final String MYSQL_DATABASE = "mongodb_cdc";
+ private static final String MYSQL_DRIVER_JAR =
+
"https://repo1.maven.org/maven2/mysql/mysql-connector-java/8.0.16/mysql-connector-java-8.0.16.jar";
+
+ private static final MySqlContainer MYSQL_CONTAINER =
createMySqlContainer();
+ private final UniqueDatabase database = new
UniqueDatabase(MYSQL_CONTAINER, MYSQL_DATABASE);
+
+ private static MySqlContainer createMySqlContainer() {
+ MySqlContainer mySqlContainer = new MySqlContainer(MySqlVersion.V8_0);
+ mySqlContainer.withNetwork(NETWORK);
+ mySqlContainer.withNetworkAliases(MYSQL_HOST);
+ mySqlContainer.withDatabaseName(MYSQL_DATABASE);
+ mySqlContainer.withUsername(MYSQL_USER_NAME);
+ mySqlContainer.withPassword(MYSQL_USER_PASSWORD);
+ mySqlContainer.withLogConsumer(
+ new
Slf4jLogConsumer(DockerLoggerFactory.getLogger("Mysql-Docker-Image")));
+ mySqlContainer.setPortBindings(Collections.singletonList("3310:3306"));
+ return mySqlContainer;
+ }
+
+ @TestContainerExtension
+ private final ContainerExtendedFactory extendedFactory =
+ container -> {
+ Container.ExecResult extraCommands =
+ container.execInContainer(
+ "bash",
+ "-c",
+ "mkdir -p /tmp/seatunnel/plugins/Jdbc/lib &&
cd /tmp/seatunnel/plugins/Jdbc/lib && wget "
+ + MYSQL_DRIVER_JAR);
+ Assertions.assertEquals(0, extraCommands.getExitCode(),
extraCommands.getStderr());
+ };
+
+ @BeforeAll
+ @Override
+ public void startUp() throws Exception {
+ log.info("Starting MySQL container...");
+ Startables.deepStart(Stream.of(MYSQL_CONTAINER)).join();
+ log.info("MySQL container started");
+ database.createAndInitialize();
+ log.info("MySQL database initialized");
+
+ log.info("Starting MongoDB A container...");
+ mongodbContainerA =
+ new MongoDBContainer(NETWORK,
MongoDBContainer.ShardingClusterRole.SHARD);
+ mongodbContainerA.withNetworkAliases("mongo0");
+
mongodbContainerA.setPortBindings(Collections.singletonList("27017:27017"));
+ mongodbContainerA.withLogConsumer(
+ new
Slf4jLogConsumer(DockerLoggerFactory.getLogger("MongoDB-A-Docker-Image")));
+ Startables.deepStart(Stream.of(mongodbContainerA)).join();
+ log.info("MongoDB A container started");
+
+ log.info("Starting MongoDB B container...");
+ mongodbContainerB =
+ new MongoDBContainer(NETWORK,
MongoDBContainer.ShardingClusterRole.SHARD);
+ mongodbContainerB.withNetworkAliases("mongo1");
+
mongodbContainerB.setPortBindings(Collections.singletonList("27018:27017"));
+ mongodbContainerB.withLogConsumer(
+ new
Slf4jLogConsumer(DockerLoggerFactory.getLogger("MongoDB-B-Docker-Image")));
+ Startables.deepStart(Stream.of(mongodbContainerB)).join();
+ log.info("MongoDB B container started");
+
+ initMongoDBConnections();
+ initMongoDBData();
+ }
+
+ private void initMongoDBConnections() {
+ String ipAddressA = mongodbContainerA.getHost();
+ Integer portA = mongodbContainerA.getFirstMappedPort();
+ String urlA =
+ String.format(
+ "mongodb://%s:%s@%s:%d/%s?authSource=admin",
+ "superuser", "superpw", ipAddressA, portA,
MONGODB_DATABASE_A);
+ clientA = MongoClients.create(urlA);
+ log.info("Connected to MongoDB A at {}:{}", ipAddressA, portA);
+
+ String ipAddressB = mongodbContainerB.getHost();
+ Integer portB = mongodbContainerB.getFirstMappedPort();
+ String urlB =
+ String.format(
+ "mongodb://%s:%s@%s:%d/%s?authSource=admin",
+ "superuser", "superpw", ipAddressB, portB,
MONGODB_DATABASE_B);
+ clientB = MongoClients.create(urlB);
+ log.info("Connected to MongoDB B at {}:{}", ipAddressB, portB);
+ }
+
+ private void initMongoDBData() {
+ MongoCollection<Document> collectionA =
+
clientA.getDatabase(MONGODB_DATABASE_A).getCollection(MONGODB_COLLECTION_A);
+ collectionA.deleteMany(new Document());
+ List<Document> dataA = new ArrayList<>();
+ dataA.add(new Document("_id", "A001").append("name", "Product
A1").append("price", 100));
+ dataA.add(new Document("_id", "A002").append("name", "Product
A2").append("price", 200));
+ dataA.add(new Document("_id", "A003").append("name", "Product
A3").append("price", 300));
+ collectionA.insertMany(dataA);
+ log.info("Inserted {} documents into MongoDB A", dataA.size());
+
+ MongoCollection<Document> collectionB =
+
clientB.getDatabase(MONGODB_DATABASE_B).getCollection(MONGODB_COLLECTION_B);
+ collectionB.deleteMany(new Document());
+ List<Document> dataB = new ArrayList<>();
+ dataB.add(new Document("_id", "B001").append("name", "Product
B1").append("price", 150));
+ dataB.add(new Document("_id", "B002").append("name", "Product
B2").append("price", 250));
+ dataB.add(new Document("_id", "B003").append("name", "Product
B3").append("price", 350));
+ collectionB.insertMany(dataB);
+ log.info("Inserted {} documents into MongoDB B", dataB.size());
+ }
+
+ @TestTemplate
+ public void testMultipleMongoDBSourcesSequentially(TestContainer
container) throws Exception {
+ createMySqlTables();
+ CompletableFuture.supplyAsync(
+ () -> {
+ try {
+ container.executeJob("/mongodb_multi_source_a.conf");
+ } catch (Exception e) {
+ log.error("MongoDB A job exception: " +
e.getMessage());
+ throw new RuntimeException(e);
+ }
+ return null;
+ });
+
+ assertMySqlHasData("products_a", 3);
+ log.info("MongoDB A data verified in MySQL");
+
+ CompletableFuture.supplyAsync(
+ () -> {
+ try {
+ container.executeJob("/mongodb_multi_source_b.conf");
+ } catch (Exception e) {
+ log.error("MongoDB B job exception: " +
e.getMessage());
+ throw new RuntimeException(e);
+ }
+ return null;
+ });
+
+ assertMySqlHasData("products_b", 3);
+ log.info("MongoDB B data verified in MySQL");
+ }
+
+ private void createMySqlTables() throws SQLException {
+ try (Connection connection = getJdbcConnection()) {
+ String createTableA =
+ "CREATE TABLE IF NOT EXISTS products_a ("
+ + "_id VARCHAR(255) PRIMARY KEY, "
+ + "name VARCHAR(255), "
+ + "price INT"
+ + ")";
+ connection.createStatement().execute(createTableA);
+ log.info("Created table products_a");
+
+ String createTableB =
+ "CREATE TABLE IF NOT EXISTS products_b ("
+ + "_id VARCHAR(255) PRIMARY KEY, "
+ + "name VARCHAR(255), "
+ + "price INT"
+ + ")";
+ connection.createStatement().execute(createTableB);
+ log.info("Created table products_b");
+ }
+ }
+
+ private void assertMySqlHasData(String tableName, int expectedCount) {
+ with().pollInterval(TWO_SECONDS)
+ .pollDelay(500, TimeUnit.MILLISECONDS)
+ .await()
+ .atMost(5, TimeUnit.MINUTES)
+ .untilAsserted(
+ () -> {
+ try (Connection connection = getJdbcConnection()) {
+ String sql = String.format("SELECT COUNT(*)
FROM %s", tableName);
+ try (ResultSet rs =
+
connection.createStatement().executeQuery(sql)) {
+ if (rs.next()) {
+ int count = rs.getInt(1);
+ log.info("Table {} has {} rows",
tableName, count);
+ Assertions.assertEquals(
+ expectedCount,
+ count,
+ String.format(
+ "Expected %d rows in
%s but found %d",
+ expectedCount,
tableName, count));
+ }
+ }
+ } catch (SQLException e) {
+ throw new RuntimeException(e);
+ }
+ });
+ }
+
+ private Connection getJdbcConnection() throws SQLException {
+ return DriverManager.getConnection(
+ MYSQL_CONTAINER.getJdbcUrl(),
+ MYSQL_CONTAINER.getUsername(),
+ MYSQL_CONTAINER.getPassword());
+ }
+
+ @AfterAll
+ @Override
+ public void tearDown() {
+ if (clientA != null) {
+ clientA.close();
+ }
+
+ if (clientB != null) {
+ clientB.close();
+ }
+
+ if (mongodbContainerA != null) {
+ mongodbContainerA.stop();
+ }
+
+ if (mongodbContainerB != null) {
+ mongodbContainerB.stop();
+ }
+
+ if (MYSQL_CONTAINER != null) {
+ MYSQL_CONTAINER.close();
+ }
+ }
+}
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-mongodb-e2e/src/test/resources/mongodb_multi_source_a.conf
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-mongodb-e2e/src/test/resources/mongodb_multi_source_a.conf
new file mode 100644
index 0000000000..f7c0c1d05b
--- /dev/null
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-mongodb-e2e/src/test/resources/mongodb_multi_source_a.conf
@@ -0,0 +1,52 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+env {
+ parallelism = 1
+ job.mode = "STREAMING"
+ checkpoint.interval = 5000
+}
+
+source {
+ MongoDB-CDC {
+ hosts = "mongo0:27017"
+ database = ["inventory_a"]
+ collection = ["inventory_a.products_a"]
+ username = superuser
+ password = superpw
+ schema = {
+ fields {
+ "_id": string
+ "name": string
+ "price": int
+ }
+ }
+ }
+}
+
+sink {
+ jdbc {
+ url = "jdbc:mysql://mysql_e2e:3306/mongodb_cdc"
+ driver = "com.mysql.cj.jdbc.Driver"
+ username = "st_user"
+ password = "seatunnel"
+ generate_sink_sql = true
+ database = mongodb_cdc
+ table = "products_a"
+ primary_keys = ["_id"]
+ }
+}
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-mongodb-e2e/src/test/resources/mongodb_multi_source_b.conf
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-mongodb-e2e/src/test/resources/mongodb_multi_source_b.conf
new file mode 100644
index 0000000000..d1f55dab0d
--- /dev/null
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-mongodb-e2e/src/test/resources/mongodb_multi_source_b.conf
@@ -0,0 +1,52 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+env {
+ parallelism = 1
+ job.mode = "STREAMING"
+ checkpoint.interval = 5000
+}
+
+source {
+ MongoDB-CDC {
+ hosts = "mongo1:27017"
+ database = ["inventory_b"]
+ collection = ["inventory_b.products_b"]
+ username = superuser
+ password = superpw
+ schema = {
+ fields {
+ "_id": string
+ "name": string
+ "price": int
+ }
+ }
+ }
+}
+
+sink {
+ jdbc {
+ url = "jdbc:mysql://mysql_e2e:3306/mongodb_cdc"
+ driver = "com.mysql.cj.jdbc.Driver"
+ username = "st_user"
+ password = "seatunnel"
+ generate_sink_sql = true
+ database = mongodb_cdc
+ table = "products_b"
+ primary_keys = ["_id"]
+ }
+}