This is an automated email from the ASF dual-hosted git repository.

lidongdai pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new 4c40c9a845 [Feature][Connector-V2] Mongodb Sink Support Multi Table 
(#9958)
4c40c9a845 is described below

commit 4c40c9a84527d4d06f660ebb0ba8d945e2796511
Author: 老王 <[email protected]>
AuthorDate: Fri Oct 31 16:18:00 2025 +0800

    [Feature][Connector-V2] Mongodb Sink Support Multi Table (#9958)
---
 docs/en/connector-v2/sink/MongoDB.md               |  29 +--
 docs/zh/connector-v2/sink/MongoDB.md               | 210 +++++++++++++++++++++
 .../seatunnel/mongodb/sink/MongodbSink.java        |   7 +-
 .../seatunnel/mongodb/sink/MongodbSinkFactory.java |   4 +-
 .../seatunnel/mongodb/sink/MongodbWriter.java      |   5 +-
 .../e2e/connector/v2/mongodb/MongodbIT.java        |  20 ++
 .../fake_source_to_mongodb_multiple_table.conf     |  92 +++++++++
 7 files changed, 348 insertions(+), 19 deletions(-)

diff --git a/docs/en/connector-v2/sink/MongoDB.md 
b/docs/en/connector-v2/sink/MongoDB.md
index d4f702bd17..feb1918c0f 100644
--- a/docs/en/connector-v2/sink/MongoDB.md
+++ b/docs/en/connector-v2/sink/MongoDB.md
@@ -14,6 +14,7 @@ import ChangeLog from '../changelog/connector-mongodb.md';
 
 - [x] [exactly-once](../../concept/connector-v2-features.md)
 - [x] [cdc](../../concept/connector-v2-features.md)
+- [x] [support multiple table write](../../concept/connector-v2-features.md)
 
 **Tips**
 
@@ -62,20 +63,20 @@ The following table lists the field data type mapping from 
MongoDB BSON type to
 
 ## Sink Options
 
-| Name                  | Type     | Required | Default | Description          
                                                                                
                                                                                
                                                  |
-|-----------------------|----------|----------|--------|----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
-| uri                   | String   | Yes      | -      | The MongoDB standard 
connection uri. eg. 
mongodb://user:password@hosts:27017/database?readPreference=secondary&slaveOk=true.
                                                                                
                           |
-| database              | String   | Yes      | -      | The name of MongoDB 
database to read or write.                                                      
                                                                                
                                                   |
-| collection            | String   | Yes      | -      | The name of MongoDB 
collection to read or write.                                                    
                                                                                
                                                   |
-| buffer-flush.max-rows | String   | No       | 1000   | Specifies the maximum 
number of buffered rows per batch request.                                      
                                                                                
                                                 |
-| buffer-flush.interval | String   | No       | 30000  | Specifies the maximum 
interval of buffered rows per batch request, the unit is millisecond.           
                                                                                
                                                 |
-| retry.max             | String   | No       | 3      | Specifies the max 
number of retry if writing records to database failed.                          
                                                                                
                                                     |
-| retry.interval        | Duration | No       | 1000   | Specifies the retry 
time interval if writing records to database failed, the unit is millisecond.   
                                                                                
                                                   |
-| upsert-enable         | Boolean  | No       | false  | Whether to write 
documents via upsert mode.                                                      
                                                                                
                                                      |
-| primary-key           | List     | No       | -      | The primary keys for 
upsert/update. Keys are in `["id","name",...]` format for properties.           
                                                                                
                                                  |
-| transaction           | Boolean  | No       | false  | Whether to use 
transactions in MongoSink (requires MongoDB 4.2+).                              
                                                                                
                                                        |
-| common-options        |          | No       | -      | Source plugin common 
parameters, please refer to [Source Common Options](../sink-common-options.md) 
for details                                                                     
                                                   |
-| data_save_mode        | String   | No       | APPEND_DATA       | The data 
saving mode of mongodb,Option introduction,`DROP_DATA`:The collection will be 
cleared before inserting data;`APPEND_DATA`:Append data 
;`ERROR_WHEN_DATA_EXISTS`:An error will be reported if there is data in the 
collection. |
+| Name                  | Type     | Required | Default | Description          
                                                                                
                                                                                
                                                                                
 |
+|-----------------------|----------|----------|--------|-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
+| uri                   | String   | Yes      | -      | The MongoDB standard 
connection uri. eg. 
mongodb://user:password@hosts:27017/database?readPreference=secondary&slaveOk=true.
                                                                                
                                                          |
+| database              | String   | Yes      | -      | The name of the 
MongoDB database to read or write to. When configuring multiple tables at the 
source, you can use `${database_name}` as a placeholder, for example: `database 
= "${database_name}_test_database"` .                                           
          |
+| collection            | String   | Yes      | -      | The name of the 
MongoDB collection to read or write. When configuring multiple tables at the 
source end, you can use `${table_name}`,`${schema_name}`,`${table_name}` as 
placeholders, for example: `collection = 
"${database_name}_${schema_name}_${table_name}_check"` |
+| buffer-flush.max-rows | String   | No       | 1000   | Specifies the maximum 
number of buffered rows per batch request.                                      
                                                                                
                                                                                
|
+| buffer-flush.interval | String   | No       | 30000  | Specifies the maximum 
interval of buffered rows per batch request, the unit is millisecond.           
                                                                                
                                                                                
|
+| retry.max             | String   | No       | 3      | Specifies the max 
number of retry if writing records to database failed.                          
                                                                                
                                                                                
    |
+| retry.interval        | Duration | No       | 1000   | Specifies the retry 
time interval if writing records to database failed, the unit is millisecond.   
                                                                                
                                                                                
  |
+| upsert-enable         | Boolean  | No       | false  | Whether to write 
documents via upsert mode.                                                      
                                                                                
                                                                                
     |
+| primary-key           | List     | No       | -      | The primary keys for 
upsert/update. Keys are in `["id","name",...]` format for properties.           
                                                                                
                                                                                
 |
+| transaction           | Boolean  | No       | false  | Whether to use 
transactions in MongoSink (requires MongoDB 4.2+).                              
                                                                                
                                                                                
       |
+| common-options        |          | No       | -      | Source plugin common 
parameters, please refer to [Source Common Options](../sink-common-options.md) 
for details                                                                     
                                                                                
  |
+| data_save_mode        | String   | No       | APPEND_DATA       | The data 
saving mode of mongodb,Option introduction,`DROP_DATA`:The collection will be 
cleared before inserting data;`APPEND_DATA`:Append data 
;`ERROR_WHEN_DATA_EXISTS`:An error will be reported if there is data in the 
collection.                                |
 
 
 ### Tips
diff --git a/docs/zh/connector-v2/sink/MongoDB.md 
b/docs/zh/connector-v2/sink/MongoDB.md
new file mode 100644
index 0000000000..a0666dfdee
--- /dev/null
+++ b/docs/zh/connector-v2/sink/MongoDB.md
@@ -0,0 +1,210 @@
+import ChangeLog from '../changelog/connector-mongodb.md';
+
+# MongoDB
+
+> MongoDB 数据接收(Sink)连接器
+
+## 支持的引擎
+
+> Spark<br/>
+> Flink<br/>
+> SeaTunnel Zeta<br/>
+
+## 关键特性
+
+- [x] [exactly-once 精准一次写入](../../concept/connector-v2-features.md)
+- [x] [CDC(变更数据捕获)](../../concept/connector-v2-features.md)
+- [x] [支持多表写入](../../concept/connector-v2-features.md)
+
+**提示**
+
+> 1. 如果希望使用 CDC 写入功能,建议启用 `upsert-enable` 配置项。
+
+## 介绍
+
+MongoDB 连接器提供从 MongoDB 读取数据以及向 MongoDB 写入数据的能力。  
+本文档将介绍如何配置 MongoDB 连接器,以便执行向 MongoDB 写入数据的任务。
+
+## 支持的数据源信息
+
+要使用 MongoDB 连接器,需要以下依赖。  
+可通过 `install-plugin.sh` 下载,或从 Maven 中央仓库获取。
+
+| 数据源 | 支持版本 | 依赖 |
+|---------|------------|---------|
+| MongoDB | 通用版本 | 
[下载](https://mvnrepository.com/artifact/org.apache.seatunnel/connector-mongodb) 
|
+
+## 数据类型映射
+
+以下表格展示了 MongoDB BSON 类型与 SeaTunnel 数据类型之间的映射关系。
+
+| SeaTunnel 数据类型 | MongoDB BSON 类型 |
+|--------------------|-------------------|
+| STRING             | ObjectId          |
+| STRING             | String            |
+| BOOLEAN            | Boolean           |
+| BINARY             | Binary            |
+| INTEGER            | Int32             |
+| TINYINT            | Int32             |
+| SMALLINT           | Int32             |
+| BIGINT             | Int64             |
+| DOUBLE             | Double            |
+| FLOAT              | Double            |
+| DECIMAL            | Decimal128        |
+| Date               | Date              |
+| Timestamp          | Timestamp / Date  |
+| ROW                | Object            |
+| ARRAY              | Array             |
+
+**提示**
+
+> 1. 使用 SeaTunnel 将 `Date` 和 `Timestamp` 类型写入 MongoDB 时,MongoDB 中都会生成 `Date` 
类型字段,但精度不同:SeaTunnel 的 `Date` 类型精度为秒,`Timestamp` 类型精度为毫秒。<br/>
+> 2. 当使用 `DECIMAL` 类型时,最大精度不能超过 34 位,也就是说应使用 `decimal(34, 18)`。
+
+## Sink 参数说明
+
+| 参数名称              | 类型     | 是否必填 | 默认值 | 说明 |
+|-----------------------|----------|----------|--------|------|
+| uri                   | String   | 是       | -      | MongoDB 标准连接 
URI,例如:`mongodb://user:password@hosts:27017/database?readPreference=secondary&slaveOk=true`。
 |
+| database              | String   | 是       | -      | 要读取或写入的 MongoDB 
数据库名称。配置多表同步时,可使用占位符 `${database_name}`,例如:`database = 
"${database_name}_test_database"`。 |
+| collection            | String   | 是       | -      | 要读取或写入的 MongoDB 
集合名称。配置多表同步时,可使用 `${table_name}`、`${schema_name}` 等占位符,例如:`collection = 
"${database_name}_${schema_name}_${table_name}_check"`。 |
+| buffer-flush.max-rows | String   | 否       | 1000   | 每次批量写入请求的最大缓存行数。 |
+| buffer-flush.interval | String   | 否       | 30000  | 批量写入的最大时间间隔(毫秒)。 |
+| retry.max             | String   | 否       | 3      | 写入失败时的最大重试次数。 |
+| retry.interval        | Duration | 否       | 1000   | 写入失败后的重试间隔时间(毫秒)。 |
+| upsert-enable         | Boolean  | 否       | false  | 是否启用 upsert 模式进行写入。 |
+| primary-key           | List     | 否       | -      | 用于 upsert 或更新操作的主键,格式为 
`["id","name",...]`。 |
+| transaction           | Boolean  | 否       | false  | 是否在 MongoSink 中使用事务(需要 
MongoDB 4.2+)。 |
+| common-options        | -        | 否       | -      | 通用 Sink 插件参数,详见 [Sink 
Common Options](../sink-common-options.md)。 |
+| data_save_mode        | String   | 否       | APPEND_DATA | 数据写入模式:<br/>- 
`DROP_DATA`: 插入数据前清空集合;<br/>- `APPEND_DATA`: 追加数据;<br/>- 
`ERROR_WHEN_DATA_EXISTS`: 如果集合已有数据则报错。 |
+
+### 提示
+
+> 1. MongoDB Sink 
连接器的数据刷新逻辑由以下三个参数共同控制:`buffer-flush.max-rows`、`buffer-flush.interval` 和 
`checkpoint.interval`。  
+     > 任一条件满足时,都会触发数据刷写。<br/>
+> 2. 兼容历史参数 `upsert-key`。若已设置 `upsert-key`,请勿同时设置 `primary-key`。
+
+## 如何创建 MongoDB 数据同步任务
+
+下面示例展示了一个将随机生成的数据写入 MongoDB 的数据同步任务:
+
+```bash
+# 设置作业的基本配置
+env {
+  parallelism = 1
+  job.mode = "BATCH"
+  checkpoint.interval  = 1000
+}
+
+source {
+  FakeSource {
+      row.num = 2
+      bigint.min = 0
+      bigint.max = 10000000
+      split.num = 1
+      split.read-interval = 300
+      schema {
+        fields {
+          c_bigint = bigint
+        }
+      }
+    }
+}
+
+sink {
+  MongoDB {
+    uri = mongodb://user:[email protected]:27017
+    database = "test"
+    collection = "test"
+  }
+}
+```
+
+## 参数详解
+
+### MongoDB 数据库连接 URI 示例
+
+无认证的单节点连接:
+
+```bash
+mongodb://127.0.0.0:27017/mydb
+```
+
+副本集连接:
+
+```bash
+mongodb://127.0.0.0:27017/mydb?replicaSet=xxx
+```
+
+带认证的副本集连接:
+
+```bash
+mongodb://admin:[email protected]:27017/mydb?replicaSet=xxx&authSource=admin
+```
+
+多节点副本集连接:
+
+```bash
+mongodb://127.0.0.1:27017,127.0.0.2:27017,127.0.0.3:27017/mydb?replicaSet=xxx
+```
+
+分片集群连接:
+
+```bash
+mongodb://127.0.0.0:27017/mydb
+```
+
+多个 mongos 节点连接:
+
+```bash
+mongodb://192.168.0.1:27017,192.168.0.2:27017,192.168.0.3:27017/mydb
+```
+
+注意:URI 中的用户名与密码在拼接前必须进行 URL 编码。
+
+### Buffer Flush 示例
+
+```bash
+sink {
+  MongoDB {
+    uri = "mongodb://user:[email protected]:27017"
+    database = "test_db"
+    collection = "users"
+    buffer-flush.max-rows = 2000
+    buffer-flush.interval = 1000
+  }
+}
+```
+
+### 为什么不推荐频繁使用事务?
+
+虽然 MongoDB 自 4.2 版本起已完全支持多文档事务,但这并不意味着所有场景都应使用。  
+事务意味着加锁、节点协调、额外开销和性能损耗。  
+设计系统时应遵循的原则是:**能不用事务就不要用事务**。  
+合理的系统设计可以在大多数情况下避免对事务的依赖。
+
+### 幂等写入(Idempotent Writes)
+
+通过定义明确的主键并启用 `upsert` 模式,可以实现精准一次写入(exactly-once)语义。
+
+当配置中定义了 `primary-key` 且启用了 `upsert-enable`,MongoDB Sink 将使用 Upsert 语义而非普通 
INSERT 语句。  
+SeaTunnel 会将定义的主键作为 MongoDB 的复合主键,在 Upsert 模式下进行写入,以确保幂等性。
+
+若作业在运行过程中失败,SeaTunnel 会从上一个成功的 checkpoint 恢复并重新处理数据,这可能导致重复数据。  
+强烈建议启用 Upsert 模式,以避免主键冲突或重复插入。
+
+```bash
+sink {
+  MongoDB {
+    uri = "mongodb://user:[email protected]:27017"
+    database = "test_db"
+    collection = "users"
+    upsert-enable = true
+    primary-key = ["name","status"]
+  }
+}
+```
+
+## 更新日志
+
+<ChangeLog />
diff --git 
a/seatunnel-connectors-v2/connector-mongodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/mongodb/sink/MongodbSink.java
 
b/seatunnel-connectors-v2/connector-mongodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/mongodb/sink/MongodbSink.java
index fe1962b538..617f77a207 100644
--- 
a/seatunnel-connectors-v2/connector-mongodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/mongodb/sink/MongodbSink.java
+++ 
b/seatunnel-connectors-v2/connector-mongodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/mongodb/sink/MongodbSink.java
@@ -25,6 +25,7 @@ import org.apache.seatunnel.api.sink.SchemaSaveMode;
 import org.apache.seatunnel.api.sink.SeaTunnelSink;
 import org.apache.seatunnel.api.sink.SinkAggregatedCommitter;
 import org.apache.seatunnel.api.sink.SinkWriter;
+import org.apache.seatunnel.api.sink.SupportMultiTableSink;
 import org.apache.seatunnel.api.sink.SupportSaveMode;
 import org.apache.seatunnel.api.table.catalog.Catalog;
 import org.apache.seatunnel.api.table.catalog.CatalogTable;
@@ -47,7 +48,8 @@ import static 
org.apache.seatunnel.connectors.seatunnel.mongodb.config.MongodbCo
 public class MongodbSink
         implements SeaTunnelSink<
                         SeaTunnelRow, DocumentBulk, MongodbCommitInfo, 
MongodbAggregatedCommitInfo>,
-                SupportSaveMode {
+                SupportSaveMode,
+                SupportMultiTableSink {
 
     private final MongodbWriterOptions options;
 
@@ -64,8 +66,7 @@ public class MongodbSink
     }
 
     @Override
-    public SinkWriter<SeaTunnelRow, MongodbCommitInfo, DocumentBulk> 
createWriter(
-            SinkWriter.Context context) {
+    public MongodbWriter createWriter(SinkWriter.Context context) {
         return new MongodbWriter(
                 new RowDataDocumentSerializer(
                         
RowDataToBsonConverters.createConverter(catalogTable.getSeaTunnelRowType()),
diff --git 
a/seatunnel-connectors-v2/connector-mongodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/mongodb/sink/MongodbSinkFactory.java
 
b/seatunnel-connectors-v2/connector-mongodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/mongodb/sink/MongodbSinkFactory.java
index 43569e7d36..b1157d49fc 100644
--- 
a/seatunnel-connectors-v2/connector-mongodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/mongodb/sink/MongodbSinkFactory.java
+++ 
b/seatunnel-connectors-v2/connector-mongodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/mongodb/sink/MongodbSinkFactory.java
@@ -19,6 +19,7 @@ package 
org.apache.seatunnel.connectors.seatunnel.mongodb.sink;
 
 import org.apache.seatunnel.api.configuration.ReadonlyConfig;
 import org.apache.seatunnel.api.configuration.util.OptionRule;
+import org.apache.seatunnel.api.options.SinkConnectorCommonOptions;
 import org.apache.seatunnel.api.table.catalog.CatalogTable;
 import org.apache.seatunnel.api.table.catalog.TableIdentifier;
 import org.apache.seatunnel.api.table.connector.TableSink;
@@ -49,7 +50,8 @@ public class MongodbSinkFactory implements TableSinkFactory {
                         MongodbConfig.RETRY_INTERVAL,
                         MongodbConfig.UPSERT_ENABLE,
                         MongodbConfig.PRIMARY_KEY,
-                        MongodbConfig.DATA_SAVE_MODE)
+                        MongodbConfig.DATA_SAVE_MODE,
+                        SinkConnectorCommonOptions.MULTI_TABLE_SINK_REPLICA)
                 .build();
     }
 
diff --git 
a/seatunnel-connectors-v2/connector-mongodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/mongodb/sink/MongodbWriter.java
 
b/seatunnel-connectors-v2/connector-mongodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/mongodb/sink/MongodbWriter.java
index ae682f70de..63ab499ccf 100644
--- 
a/seatunnel-connectors-v2/connector-mongodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/mongodb/sink/MongodbWriter.java
+++ 
b/seatunnel-connectors-v2/connector-mongodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/mongodb/sink/MongodbWriter.java
@@ -18,6 +18,7 @@
 package org.apache.seatunnel.connectors.seatunnel.mongodb.sink;
 
 import org.apache.seatunnel.api.sink.SinkWriter;
+import org.apache.seatunnel.api.sink.SupportMultiTableSinkWriter;
 import org.apache.seatunnel.api.table.type.RowKind;
 import org.apache.seatunnel.api.table.type.SeaTunnelRow;
 import 
org.apache.seatunnel.connectors.seatunnel.mongodb.exception.MongodbConnectorException;
@@ -47,7 +48,9 @@ import java.util.stream.IntStream;
 import static 
org.apache.seatunnel.common.exception.CommonErrorCodeDeprecated.WRITER_OPERATION_FAILED;
 
 @Slf4j
-public class MongodbWriter implements SinkWriter<SeaTunnelRow, 
MongodbCommitInfo, DocumentBulk> {
+public class MongodbWriter
+        implements SinkWriter<SeaTunnelRow, MongodbCommitInfo, DocumentBulk>,
+                SupportMultiTableSinkWriter<Void> {
 
     private MongodbClientProvider collectionProvider;
 
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-mongodb-e2e/src/test/java/org/apache/seatunnel/e2e/connector/v2/mongodb/MongodbIT.java
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-mongodb-e2e/src/test/java/org/apache/seatunnel/e2e/connector/v2/mongodb/MongodbIT.java
index 4a721333e9..cea432a012 100644
--- 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-mongodb-e2e/src/test/java/org/apache/seatunnel/e2e/connector/v2/mongodb/MongodbIT.java
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-mongodb-e2e/src/test/java/org/apache/seatunnel/e2e/connector/v2/mongodb/MongodbIT.java
@@ -266,6 +266,26 @@ public class MongodbIT extends AbstractMongodbIT {
         clearData(MONGODB_DOUBLE_TABLE_RESULT);
     }
 
+    @TestTemplate
+    public void testFakeSourceToMongodbMultipleTable(TestContainer container)
+            throws IOException, InterruptedException {
+        Container.ExecResult insertResult =
+                
container.executeJob("/fake_source_to_mongodb_multiple_table.conf");
+        Assertions.assertEquals(0, insertResult.getExitCode(), 
insertResult.getStderr());
+        String collectionOneStr = "testDatabase1_testSchema1_testTable1_check";
+        MongoCollection<BsonDocument> collectionOne =
+                client.getDatabase(MONGODB_DATABASE)
+                        .getCollection(collectionOneStr, BsonDocument.class);
+        Assertions.assertEquals(1, collectionOne.countDocuments());
+        String collectionTwoStr = "testDatabase2_testSchema2_testTable2_check";
+        MongoCollection<BsonDocument> collectionTwo =
+                client.getDatabase(MONGODB_DATABASE)
+                        .getCollection(collectionTwoStr, BsonDocument.class);
+        Assertions.assertEquals(1, collectionTwo.countDocuments());
+        clearData(collectionOneStr);
+        clearData(collectionTwoStr);
+    }
+
     @SneakyThrows
     @TestTemplate
     public void testDropDataSaveMode(TestContainer container) {
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-mongodb-e2e/src/test/resources/fake_source_to_mongodb_multiple_table.conf
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-mongodb-e2e/src/test/resources/fake_source_to_mongodb_multiple_table.conf
new file mode 100644
index 0000000000..5c65420b2f
--- /dev/null
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-mongodb-e2e/src/test/resources/fake_source_to_mongodb_multiple_table.conf
@@ -0,0 +1,92 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+env {
+  parallelism = 1
+  job.mode = "BATCH"
+
+  # You can set spark configuration here
+  spark.app.name = "SeaTunnel"
+  spark.executor.instances = 1
+  spark.executor.cores = 1
+  spark.executor.memory = "1g"
+  spark.master = local
+}
+
+source {
+  FakeSource {
+    plugin_output = "mongodb_table"
+    tables_configs = [
+       {
+        schema = {
+         table = "testDatabase1.testSchema1.testTable1"
+         fields {
+                id = int
+                val_bool = boolean
+                val_tinyint = tinyint
+                val_smallint = smallint
+                val_int = int
+                val_bigint = bigint
+                val_float = float
+                val_double = double
+                val_decimal = "decimal(16, 1)"
+                val_string = string
+      }
+        }
+            rows = [
+              {
+                kind = INSERT
+                fields = [1, true, 1, 2, 3, 4, 4.3,5.3,6.3, "NEW"]
+              }
+              ]
+       },
+       {
+       schema = {
+         table = "testDatabase2.testSchema2.testTable2"
+              fields {
+                id = int
+                val_bool = boolean
+                val_tinyint = tinyint
+                val_smallint = smallint
+                val_int = int
+                val_bigint = bigint
+                val_float = float
+                val_double = double
+                val_decimal = "decimal(16, 1)"
+              }
+       }
+           rows = [
+             {
+               kind = INSERT
+               fields = [1, true, 1, 2, 3, 4, 4.3,5.3,6.3]
+             }
+             ]
+      }
+    ]
+  }
+}
+
+transform {
+}
+
+sink {
+  MongoDB {
+    uri = "mongodb://e2e_mongodb:27017/test_db?retryWrites=true"
+    database = "test_db"
+    collection = "${database_name}_${schema_name}_${table_name}_check"
+  }
+}
\ No newline at end of file

Reply via email to