This is an automated email from the ASF dual-hosted git repository.
lidongdai pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new 4c40c9a845 [Feature][Connector-V2] Mongodb Sink Support Multi Table
(#9958)
4c40c9a845 is described below
commit 4c40c9a84527d4d06f660ebb0ba8d945e2796511
Author: 老王 <[email protected]>
AuthorDate: Fri Oct 31 16:18:00 2025 +0800
[Feature][Connector-V2] Mongodb Sink Support Multi Table (#9958)
---
docs/en/connector-v2/sink/MongoDB.md | 29 +--
docs/zh/connector-v2/sink/MongoDB.md | 210 +++++++++++++++++++++
.../seatunnel/mongodb/sink/MongodbSink.java | 7 +-
.../seatunnel/mongodb/sink/MongodbSinkFactory.java | 4 +-
.../seatunnel/mongodb/sink/MongodbWriter.java | 5 +-
.../e2e/connector/v2/mongodb/MongodbIT.java | 20 ++
.../fake_source_to_mongodb_multiple_table.conf | 92 +++++++++
7 files changed, 348 insertions(+), 19 deletions(-)
diff --git a/docs/en/connector-v2/sink/MongoDB.md
b/docs/en/connector-v2/sink/MongoDB.md
index d4f702bd17..feb1918c0f 100644
--- a/docs/en/connector-v2/sink/MongoDB.md
+++ b/docs/en/connector-v2/sink/MongoDB.md
@@ -14,6 +14,7 @@ import ChangeLog from '../changelog/connector-mongodb.md';
- [x] [exactly-once](../../concept/connector-v2-features.md)
- [x] [cdc](../../concept/connector-v2-features.md)
+- [x] [support multiple table write](../../concept/connector-v2-features.md)
**Tips**
@@ -62,20 +63,20 @@ The following table lists the field data type mapping from
MongoDB BSON type to
## Sink Options
-| Name | Type | Required | Default | Description
|
-|-----------------------|----------|----------|--------|----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
-| uri | String | Yes | - | The MongoDB standard
connection uri. eg.
mongodb://user:password@hosts:27017/database?readPreference=secondary&slaveOk=true.
|
-| database | String | Yes | - | The name of MongoDB
database to read or write.
|
-| collection | String | Yes | - | The name of MongoDB
collection to read or write.
|
-| buffer-flush.max-rows | String | No | 1000 | Specifies the maximum
number of buffered rows per batch request.
|
-| buffer-flush.interval | String | No | 30000 | Specifies the maximum
interval of buffered rows per batch request, the unit is millisecond.
|
-| retry.max | String | No | 3 | Specifies the max
number of retry if writing records to database failed.
|
-| retry.interval | Duration | No | 1000 | Specifies the retry
time interval if writing records to database failed, the unit is millisecond.
|
-| upsert-enable | Boolean | No | false | Whether to write
documents via upsert mode.
|
-| primary-key | List | No | - | The primary keys for
upsert/update. Keys are in `["id","name",...]` format for properties.
|
-| transaction | Boolean | No | false | Whether to use
transactions in MongoSink (requires MongoDB 4.2+).
|
-| common-options | | No | - | Source plugin common
parameters, please refer to [Source Common Options](../sink-common-options.md)
for details
|
-| data_save_mode | String | No | APPEND_DATA | The data
saving mode of mongodb,Option introduction,`DROP_DATA`:The collection will be
cleared before inserting data;`APPEND_DATA`:Append data
;`ERROR_WHEN_DATA_EXISTS`:An error will be reported if there is data in the
collection. |
+| Name | Type | Required | Default | Description
|
+|-----------------------|----------|----------|--------|-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
+| uri | String | Yes | - | The MongoDB standard
connection uri. eg.
mongodb://user:password@hosts:27017/database?readPreference=secondary&slaveOk=true.
|
+| database | String | Yes | - | The name of the
MongoDB database to read or write to. When configuring multiple tables at the
source, you can use `${database_name}` as a placeholder, for example: `database
= "${database_name}_test_database"` .
|
+| collection | String | Yes | - | The name of the
MongoDB collection to read or write. When configuring multiple tables at the
source end, you can use `${table_name}`,`${schema_name}`,`${table_name}` as
placeholders, for example: `collection =
"${database_name}_${schema_name}_${table_name}_check"` |
+| buffer-flush.max-rows | String | No | 1000 | Specifies the maximum
number of buffered rows per batch request.
|
+| buffer-flush.interval | String | No | 30000 | Specifies the maximum
interval of buffered rows per batch request, the unit is millisecond.
|
+| retry.max | String | No | 3 | Specifies the max
number of retry if writing records to database failed.
|
+| retry.interval | Duration | No | 1000 | Specifies the retry
time interval if writing records to database failed, the unit is millisecond.
|
+| upsert-enable | Boolean | No | false | Whether to write
documents via upsert mode.
|
+| primary-key | List | No | - | The primary keys for
upsert/update. Keys are in `["id","name",...]` format for properties.
|
+| transaction | Boolean | No | false | Whether to use
transactions in MongoSink (requires MongoDB 4.2+).
|
+| common-options | | No | - | Source plugin common
parameters, please refer to [Source Common Options](../sink-common-options.md)
for details
|
+| data_save_mode | String | No | APPEND_DATA | The data
saving mode of mongodb,Option introduction,`DROP_DATA`:The collection will be
cleared before inserting data;`APPEND_DATA`:Append data
;`ERROR_WHEN_DATA_EXISTS`:An error will be reported if there is data in the
collection. |
### Tips
diff --git a/docs/zh/connector-v2/sink/MongoDB.md
b/docs/zh/connector-v2/sink/MongoDB.md
new file mode 100644
index 0000000000..a0666dfdee
--- /dev/null
+++ b/docs/zh/connector-v2/sink/MongoDB.md
@@ -0,0 +1,210 @@
+import ChangeLog from '../changelog/connector-mongodb.md';
+
+# MongoDB
+
+> MongoDB 数据接收(Sink)连接器
+
+## 支持的引擎
+
+> Spark<br/>
+> Flink<br/>
+> SeaTunnel Zeta<br/>
+
+## 关键特性
+
+- [x] [exactly-once 精准一次写入](../../concept/connector-v2-features.md)
+- [x] [CDC(变更数据捕获)](../../concept/connector-v2-features.md)
+- [x] [支持多表写入](../../concept/connector-v2-features.md)
+
+**提示**
+
+> 1. 如果希望使用 CDC 写入功能,建议启用 `upsert-enable` 配置项。
+
+## 介绍
+
+MongoDB 连接器提供从 MongoDB 读取数据以及向 MongoDB 写入数据的能力。
+本文档将介绍如何配置 MongoDB 连接器,以便执行向 MongoDB 写入数据的任务。
+
+## 支持的数据源信息
+
+要使用 MongoDB 连接器,需要以下依赖。
+可通过 `install-plugin.sh` 下载,或从 Maven 中央仓库获取。
+
+| 数据源 | 支持版本 | 依赖 |
+|---------|------------|---------|
+| MongoDB | 通用版本 |
[下载](https://mvnrepository.com/artifact/org.apache.seatunnel/connector-mongodb)
|
+
+## 数据类型映射
+
+以下表格展示了 MongoDB BSON 类型与 SeaTunnel 数据类型之间的映射关系。
+
+| SeaTunnel 数据类型 | MongoDB BSON 类型 |
+|--------------------|-------------------|
+| STRING | ObjectId |
+| STRING | String |
+| BOOLEAN | Boolean |
+| BINARY | Binary |
+| INTEGER | Int32 |
+| TINYINT | Int32 |
+| SMALLINT | Int32 |
+| BIGINT | Int64 |
+| DOUBLE | Double |
+| FLOAT | Double |
+| DECIMAL | Decimal128 |
+| Date | Date |
+| Timestamp | Timestamp / Date |
+| ROW | Object |
+| ARRAY | Array |
+
+**提示**
+
+> 1. 使用 SeaTunnel 将 `Date` 和 `Timestamp` 类型写入 MongoDB 时,MongoDB 中都会生成 `Date`
类型字段,但精度不同:SeaTunnel 的 `Date` 类型精度为秒,`Timestamp` 类型精度为毫秒。<br/>
+> 2. 当使用 `DECIMAL` 类型时,最大精度不能超过 34 位,也就是说应使用 `decimal(34, 18)`。
+
+## Sink 参数说明
+
+| 参数名称 | 类型 | 是否必填 | 默认值 | 说明 |
+|-----------------------|----------|----------|--------|------|
+| uri | String | 是 | - | MongoDB 标准连接
URI,例如:`mongodb://user:password@hosts:27017/database?readPreference=secondary&slaveOk=true`。
|
+| database | String | 是 | - | 要读取或写入的 MongoDB
数据库名称。配置多表同步时,可使用占位符 `${database_name}`,例如:`database =
"${database_name}_test_database"`。 |
+| collection | String | 是 | - | 要读取或写入的 MongoDB
集合名称。配置多表同步时,可使用 `${table_name}`、`${schema_name}` 等占位符,例如:`collection =
"${database_name}_${schema_name}_${table_name}_check"`。 |
+| buffer-flush.max-rows | String | 否 | 1000 | 每次批量写入请求的最大缓存行数。 |
+| buffer-flush.interval | String | 否 | 30000 | 批量写入的最大时间间隔(毫秒)。 |
+| retry.max | String | 否 | 3 | 写入失败时的最大重试次数。 |
+| retry.interval | Duration | 否 | 1000 | 写入失败后的重试间隔时间(毫秒)。 |
+| upsert-enable | Boolean | 否 | false | 是否启用 upsert 模式进行写入。 |
+| primary-key | List | 否 | - | 用于 upsert 或更新操作的主键,格式为
`["id","name",...]`。 |
+| transaction | Boolean | 否 | false | 是否在 MongoSink 中使用事务(需要
MongoDB 4.2+)。 |
+| common-options | - | 否 | - | 通用 Sink 插件参数,详见 [Sink
Common Options](../sink-common-options.md)。 |
+| data_save_mode | String | 否 | APPEND_DATA | 数据写入模式:<br/>-
`DROP_DATA`: 插入数据前清空集合;<br/>- `APPEND_DATA`: 追加数据;<br/>-
`ERROR_WHEN_DATA_EXISTS`: 如果集合已有数据则报错。 |
+
+### 提示
+
+> 1. MongoDB Sink
连接器的数据刷新逻辑由以下三个参数共同控制:`buffer-flush.max-rows`、`buffer-flush.interval` 和
`checkpoint.interval`。
+ > 任一条件满足时,都会触发数据刷写。<br/>
+> 2. 兼容历史参数 `upsert-key`。若已设置 `upsert-key`,请勿同时设置 `primary-key`。
+
+## 如何创建 MongoDB 数据同步任务
+
+下面示例展示了一个将随机生成的数据写入 MongoDB 的数据同步任务:
+
+```bash
+# 设置作业的基本配置
+env {
+ parallelism = 1
+ job.mode = "BATCH"
+ checkpoint.interval = 1000
+}
+
+source {
+ FakeSource {
+ row.num = 2
+ bigint.min = 0
+ bigint.max = 10000000
+ split.num = 1
+ split.read-interval = 300
+ schema {
+ fields {
+ c_bigint = bigint
+ }
+ }
+ }
+}
+
+sink {
+ MongoDB {
+ uri = mongodb://user:[email protected]:27017
+ database = "test"
+ collection = "test"
+ }
+}
+```
+
+## 参数详解
+
+### MongoDB 数据库连接 URI 示例
+
+无认证的单节点连接:
+
+```bash
+mongodb://127.0.0.0:27017/mydb
+```
+
+副本集连接:
+
+```bash
+mongodb://127.0.0.0:27017/mydb?replicaSet=xxx
+```
+
+带认证的副本集连接:
+
+```bash
+mongodb://admin:[email protected]:27017/mydb?replicaSet=xxx&authSource=admin
+```
+
+多节点副本集连接:
+
+```bash
+mongodb://127.0.0.1:27017,127.0.0.2:27017,127.0.0.3:27017/mydb?replicaSet=xxx
+```
+
+分片集群连接:
+
+```bash
+mongodb://127.0.0.0:27017/mydb
+```
+
+多个 mongos 节点连接:
+
+```bash
+mongodb://192.168.0.1:27017,192.168.0.2:27017,192.168.0.3:27017/mydb
+```
+
+注意:URI 中的用户名与密码在拼接前必须进行 URL 编码。
+
+### Buffer Flush 示例
+
+```bash
+sink {
+ MongoDB {
+ uri = "mongodb://user:[email protected]:27017"
+ database = "test_db"
+ collection = "users"
+ buffer-flush.max-rows = 2000
+ buffer-flush.interval = 1000
+ }
+}
+```
+
+### 为什么不推荐频繁使用事务?
+
+虽然 MongoDB 自 4.2 版本起已完全支持多文档事务,但这并不意味着所有场景都应使用。
+事务意味着加锁、节点协调、额外开销和性能损耗。
+设计系统时应遵循的原则是:**能不用事务就不要用事务**。
+合理的系统设计可以在大多数情况下避免对事务的依赖。
+
+### 幂等写入(Idempotent Writes)
+
+通过定义明确的主键并启用 `upsert` 模式,可以实现精准一次写入(exactly-once)语义。
+
+当配置中定义了 `primary-key` 且启用了 `upsert-enable`,MongoDB Sink 将使用 Upsert 语义而非普通
INSERT 语句。
+SeaTunnel 会将定义的主键作为 MongoDB 的复合主键,在 Upsert 模式下进行写入,以确保幂等性。
+
+若作业在运行过程中失败,SeaTunnel 会从上一个成功的 checkpoint 恢复并重新处理数据,这可能导致重复数据。
+强烈建议启用 Upsert 模式,以避免主键冲突或重复插入。
+
+```bash
+sink {
+ MongoDB {
+ uri = "mongodb://user:[email protected]:27017"
+ database = "test_db"
+ collection = "users"
+ upsert-enable = true
+ primary-key = ["name","status"]
+ }
+}
+```
+
+## 更新日志
+
+<ChangeLog />
diff --git
a/seatunnel-connectors-v2/connector-mongodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/mongodb/sink/MongodbSink.java
b/seatunnel-connectors-v2/connector-mongodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/mongodb/sink/MongodbSink.java
index fe1962b538..617f77a207 100644
---
a/seatunnel-connectors-v2/connector-mongodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/mongodb/sink/MongodbSink.java
+++
b/seatunnel-connectors-v2/connector-mongodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/mongodb/sink/MongodbSink.java
@@ -25,6 +25,7 @@ import org.apache.seatunnel.api.sink.SchemaSaveMode;
import org.apache.seatunnel.api.sink.SeaTunnelSink;
import org.apache.seatunnel.api.sink.SinkAggregatedCommitter;
import org.apache.seatunnel.api.sink.SinkWriter;
+import org.apache.seatunnel.api.sink.SupportMultiTableSink;
import org.apache.seatunnel.api.sink.SupportSaveMode;
import org.apache.seatunnel.api.table.catalog.Catalog;
import org.apache.seatunnel.api.table.catalog.CatalogTable;
@@ -47,7 +48,8 @@ import static
org.apache.seatunnel.connectors.seatunnel.mongodb.config.MongodbCo
public class MongodbSink
implements SeaTunnelSink<
SeaTunnelRow, DocumentBulk, MongodbCommitInfo,
MongodbAggregatedCommitInfo>,
- SupportSaveMode {
+ SupportSaveMode,
+ SupportMultiTableSink {
private final MongodbWriterOptions options;
@@ -64,8 +66,7 @@ public class MongodbSink
}
@Override
- public SinkWriter<SeaTunnelRow, MongodbCommitInfo, DocumentBulk>
createWriter(
- SinkWriter.Context context) {
+ public MongodbWriter createWriter(SinkWriter.Context context) {
return new MongodbWriter(
new RowDataDocumentSerializer(
RowDataToBsonConverters.createConverter(catalogTable.getSeaTunnelRowType()),
diff --git
a/seatunnel-connectors-v2/connector-mongodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/mongodb/sink/MongodbSinkFactory.java
b/seatunnel-connectors-v2/connector-mongodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/mongodb/sink/MongodbSinkFactory.java
index 43569e7d36..b1157d49fc 100644
---
a/seatunnel-connectors-v2/connector-mongodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/mongodb/sink/MongodbSinkFactory.java
+++
b/seatunnel-connectors-v2/connector-mongodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/mongodb/sink/MongodbSinkFactory.java
@@ -19,6 +19,7 @@ package
org.apache.seatunnel.connectors.seatunnel.mongodb.sink;
import org.apache.seatunnel.api.configuration.ReadonlyConfig;
import org.apache.seatunnel.api.configuration.util.OptionRule;
+import org.apache.seatunnel.api.options.SinkConnectorCommonOptions;
import org.apache.seatunnel.api.table.catalog.CatalogTable;
import org.apache.seatunnel.api.table.catalog.TableIdentifier;
import org.apache.seatunnel.api.table.connector.TableSink;
@@ -49,7 +50,8 @@ public class MongodbSinkFactory implements TableSinkFactory {
MongodbConfig.RETRY_INTERVAL,
MongodbConfig.UPSERT_ENABLE,
MongodbConfig.PRIMARY_KEY,
- MongodbConfig.DATA_SAVE_MODE)
+ MongodbConfig.DATA_SAVE_MODE,
+ SinkConnectorCommonOptions.MULTI_TABLE_SINK_REPLICA)
.build();
}
diff --git
a/seatunnel-connectors-v2/connector-mongodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/mongodb/sink/MongodbWriter.java
b/seatunnel-connectors-v2/connector-mongodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/mongodb/sink/MongodbWriter.java
index ae682f70de..63ab499ccf 100644
---
a/seatunnel-connectors-v2/connector-mongodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/mongodb/sink/MongodbWriter.java
+++
b/seatunnel-connectors-v2/connector-mongodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/mongodb/sink/MongodbWriter.java
@@ -18,6 +18,7 @@
package org.apache.seatunnel.connectors.seatunnel.mongodb.sink;
import org.apache.seatunnel.api.sink.SinkWriter;
+import org.apache.seatunnel.api.sink.SupportMultiTableSinkWriter;
import org.apache.seatunnel.api.table.type.RowKind;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import
org.apache.seatunnel.connectors.seatunnel.mongodb.exception.MongodbConnectorException;
@@ -47,7 +48,9 @@ import java.util.stream.IntStream;
import static
org.apache.seatunnel.common.exception.CommonErrorCodeDeprecated.WRITER_OPERATION_FAILED;
@Slf4j
-public class MongodbWriter implements SinkWriter<SeaTunnelRow,
MongodbCommitInfo, DocumentBulk> {
+public class MongodbWriter
+ implements SinkWriter<SeaTunnelRow, MongodbCommitInfo, DocumentBulk>,
+ SupportMultiTableSinkWriter<Void> {
private MongodbClientProvider collectionProvider;
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-mongodb-e2e/src/test/java/org/apache/seatunnel/e2e/connector/v2/mongodb/MongodbIT.java
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-mongodb-e2e/src/test/java/org/apache/seatunnel/e2e/connector/v2/mongodb/MongodbIT.java
index 4a721333e9..cea432a012 100644
---
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-mongodb-e2e/src/test/java/org/apache/seatunnel/e2e/connector/v2/mongodb/MongodbIT.java
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-mongodb-e2e/src/test/java/org/apache/seatunnel/e2e/connector/v2/mongodb/MongodbIT.java
@@ -266,6 +266,26 @@ public class MongodbIT extends AbstractMongodbIT {
clearData(MONGODB_DOUBLE_TABLE_RESULT);
}
+ @TestTemplate
+ public void testFakeSourceToMongodbMultipleTable(TestContainer container)
+ throws IOException, InterruptedException {
+ Container.ExecResult insertResult =
+
container.executeJob("/fake_source_to_mongodb_multiple_table.conf");
+ Assertions.assertEquals(0, insertResult.getExitCode(),
insertResult.getStderr());
+ String collectionOneStr = "testDatabase1_testSchema1_testTable1_check";
+ MongoCollection<BsonDocument> collectionOne =
+ client.getDatabase(MONGODB_DATABASE)
+ .getCollection(collectionOneStr, BsonDocument.class);
+ Assertions.assertEquals(1, collectionOne.countDocuments());
+ String collectionTwoStr = "testDatabase2_testSchema2_testTable2_check";
+ MongoCollection<BsonDocument> collectionTwo =
+ client.getDatabase(MONGODB_DATABASE)
+ .getCollection(collectionTwoStr, BsonDocument.class);
+ Assertions.assertEquals(1, collectionTwo.countDocuments());
+ clearData(collectionOneStr);
+ clearData(collectionTwoStr);
+ }
+
@SneakyThrows
@TestTemplate
public void testDropDataSaveMode(TestContainer container) {
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-mongodb-e2e/src/test/resources/fake_source_to_mongodb_multiple_table.conf
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-mongodb-e2e/src/test/resources/fake_source_to_mongodb_multiple_table.conf
new file mode 100644
index 0000000000..5c65420b2f
--- /dev/null
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-mongodb-e2e/src/test/resources/fake_source_to_mongodb_multiple_table.conf
@@ -0,0 +1,92 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+env {
+ parallelism = 1
+ job.mode = "BATCH"
+
+ # You can set spark configuration here
+ spark.app.name = "SeaTunnel"
+ spark.executor.instances = 1
+ spark.executor.cores = 1
+ spark.executor.memory = "1g"
+ spark.master = local
+}
+
+source {
+ FakeSource {
+ plugin_output = "mongodb_table"
+ tables_configs = [
+ {
+ schema = {
+ table = "testDatabase1.testSchema1.testTable1"
+ fields {
+ id = int
+ val_bool = boolean
+ val_tinyint = tinyint
+ val_smallint = smallint
+ val_int = int
+ val_bigint = bigint
+ val_float = float
+ val_double = double
+ val_decimal = "decimal(16, 1)"
+ val_string = string
+ }
+ }
+ rows = [
+ {
+ kind = INSERT
+ fields = [1, true, 1, 2, 3, 4, 4.3,5.3,6.3, "NEW"]
+ }
+ ]
+ },
+ {
+ schema = {
+ table = "testDatabase2.testSchema2.testTable2"
+ fields {
+ id = int
+ val_bool = boolean
+ val_tinyint = tinyint
+ val_smallint = smallint
+ val_int = int
+ val_bigint = bigint
+ val_float = float
+ val_double = double
+ val_decimal = "decimal(16, 1)"
+ }
+ }
+ rows = [
+ {
+ kind = INSERT
+ fields = [1, true, 1, 2, 3, 4, 4.3,5.3,6.3]
+ }
+ ]
+ }
+ ]
+ }
+}
+
+transform {
+}
+
+sink {
+ MongoDB {
+ uri = "mongodb://e2e_mongodb:27017/test_db?retryWrites=true"
+ database = "test_db"
+ collection = "${database_name}_${schema_name}_${table_name}_check"
+ }
+}
\ No newline at end of file