This is an automated email from the ASF dual-hosted git repository.

fanjia pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new 51c28a338 [Feature][Connector-V2] Add mongodb connecter sink (#2694)
51c28a338 is described below

commit 51c28a3387fa99ebea9118034cd0d99d3c82578a
Author: ChunFu Wu <[email protected]>
AuthorDate: Sun Sep 11 16:33:08 2022 +0800

    [Feature][Connector-V2] Add mongodb connecter sink (#2694)
    
    * [Feature][Connector-V2] Add mongodb connecter sink
    
    * Add license header
    
    * Add spark mongodb sink e2e
    
    * Add spark mongodb sink e2e
    
    * Fix
    
    * Fix
    
    * Fix
---
 docs/en/connector-v2/sink/MongoDB.md               |  46 +++++++
 plugin-mapping.properties                          |   1 +
 .../seatunnel/mongodb/sink/MongodbSink.java        |  80 ++++++++++++
 .../seatunnel/mongodb/sink/MongodbSinkWriter.java  |  72 +++++++++++
 .../flink/v2/mongodb/FakeSourceToMongodbIT.java    | 137 +++++++++++++++++++++
 .../test/resources/mongodb/fake_to_mongodb.conf    |  74 +++++++++++
 .../{ => connector-mongodb-spark-e2e}/pom.xml      |  43 +++----
 .../spark/v2/mongodb/FakeSourceToMongodbIT.java    | 137 +++++++++++++++++++++
 .../src/test/resources/log4j.properties            |  23 ++++
 .../test/resources/mongodb/fake_to_mongodb.conf    |  70 +++++++++++
 .../seatunnel-spark-connector-v2-e2e/pom.xml       |   1 +
 11 files changed, 663 insertions(+), 21 deletions(-)

diff --git a/docs/en/connector-v2/sink/MongoDB.md 
b/docs/en/connector-v2/sink/MongoDB.md
new file mode 100644
index 000000000..2768aa03c
--- /dev/null
+++ b/docs/en/connector-v2/sink/MongoDB.md
@@ -0,0 +1,46 @@
+# MongoDb
+
+> MongoDB sink connector
+
+## Description
+
+Write data to `MongoDB`
+
+## Key features
+
+- [x] [batch](../../concept/connector-v2-features.md)
+- [x] [stream](../../concept/connector-v2-features.md)
+- [ ] [exactly-once](../../concept/connector-v2-features.md)
+- [ ] [schema projection](../../concept/connector-v2-features.md)
+- [ ] [parallelism](../../concept/connector-v2-features.md)
+- [ ] [support user-defined split](../../concept/connector-v2-features.md)
+
+## Options
+
+| name       | type   | required | default value |
+|------------| ------ |----------| ------------- |
+| uri        | string | yes      | -             |
+| database   | string | yes      | -             |
+| collection | string | yes      | -             |
+
+### uri [string]
+
+uri to write to mongoDB
+
+### database [string]
+
+database to write to mongoDB
+
+### collection [string]
+
+collection to write to mongoDB
+
+## Example
+
+```bash
+mongodb {
+    uri = 
"mongodb://username:[email protected]:27017/mypost?retryWrites=true&writeConcern=majority"
+    database = "mydatabase"
+    collection = "mycollection"
+}
+```
diff --git a/plugin-mapping.properties b/plugin-mapping.properties
index e90f02534..cde36d782 100644
--- a/plugin-mapping.properties
+++ b/plugin-mapping.properties
@@ -127,4 +127,5 @@ seatunnel.sink.Redis = connector-redis
 seatunnel.sink.DataHub = connector-datahub
 seatunnel.sink.Sentry = connector-sentry
 seatunnel.source.MongoDB = connector-mongodb
+seatunnel.sink.MongoDB = connector-mongodb
 
diff --git 
a/seatunnel-connectors-v2/connector-mongodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/mongodb/sink/MongodbSink.java
 
b/seatunnel-connectors-v2/connector-mongodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/mongodb/sink/MongodbSink.java
new file mode 100644
index 000000000..e3abeed0b
--- /dev/null
+++ 
b/seatunnel-connectors-v2/connector-mongodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/mongodb/sink/MongodbSink.java
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.mongodb.sink;
+
+import static 
org.apache.seatunnel.connectors.seatunnel.mongodb.config.MongodbConfig.COLLECTION;
+import static 
org.apache.seatunnel.connectors.seatunnel.mongodb.config.MongodbConfig.DATABASE;
+import static 
org.apache.seatunnel.connectors.seatunnel.mongodb.config.MongodbConfig.URI;
+
+import org.apache.seatunnel.api.common.PrepareFailException;
+import org.apache.seatunnel.api.sink.SeaTunnelSink;
+import org.apache.seatunnel.api.sink.SinkWriter;
+import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
+import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
+import org.apache.seatunnel.common.config.CheckConfigUtil;
+import org.apache.seatunnel.common.config.CheckResult;
+import org.apache.seatunnel.common.constants.PluginType;
+import 
org.apache.seatunnel.connectors.seatunnel.common.sink.AbstractSimpleSink;
+import 
org.apache.seatunnel.connectors.seatunnel.common.sink.AbstractSinkWriter;
+import 
org.apache.seatunnel.connectors.seatunnel.mongodb.config.MongodbParameters;
+
+import org.apache.seatunnel.shade.com.typesafe.config.Config;
+import org.apache.seatunnel.shade.com.typesafe.config.ConfigBeanFactory;
+
+import com.google.auto.service.AutoService;
+
+import java.io.IOException;
+
+@AutoService(SeaTunnelSink.class)
+public class MongodbSink extends AbstractSimpleSink<SeaTunnelRow, Void> {
+
+    private SeaTunnelRowType rowType;
+
+    private MongodbParameters params;
+
+    @Override
+    public String getPluginName() {
+        return "MongoDB";
+    }
+
+    @Override
+    public void prepare(Config config) throws PrepareFailException {
+        CheckResult result = CheckConfigUtil.checkAllExists(config, URI, 
DATABASE, COLLECTION);
+        if (!result.isSuccess()) {
+            throw new PrepareFailException(getPluginName(), PluginType.SOURCE, 
result.getMsg());
+        }
+
+        this.params = ConfigBeanFactory.create(config, 
MongodbParameters.class);
+    }
+
+    @Override
+    public void setTypeInfo(SeaTunnelRowType rowType) {
+        this.rowType = rowType;
+    }
+
+    @Override
+    public SeaTunnelDataType<SeaTunnelRow> getConsumedType() {
+        return rowType;
+    }
+
+    @Override
+    public AbstractSinkWriter<SeaTunnelRow, Void> 
createWriter(SinkWriter.Context context) throws IOException {
+        return new MongodbSinkWriter(rowType, params);
+    }
+}
diff --git 
a/seatunnel-connectors-v2/connector-mongodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/mongodb/sink/MongodbSinkWriter.java
 
b/seatunnel-connectors-v2/connector-mongodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/mongodb/sink/MongodbSinkWriter.java
new file mode 100644
index 000000000..a1000931c
--- /dev/null
+++ 
b/seatunnel-connectors-v2/connector-mongodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/mongodb/sink/MongodbSinkWriter.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.mongodb.sink;
+
+import org.apache.seatunnel.api.serialization.SerializationSchema;
+import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
+import 
org.apache.seatunnel.connectors.seatunnel.common.sink.AbstractSinkWriter;
+import 
org.apache.seatunnel.connectors.seatunnel.mongodb.config.MongodbParameters;
+import org.apache.seatunnel.format.json.JsonSerializationSchema;
+
+import com.mongodb.client.MongoClient;
+import com.mongodb.client.MongoClients;
+import com.mongodb.client.MongoCollection;
+import org.bson.Document;
+
+import java.io.IOException;
+
+public class MongodbSinkWriter extends AbstractSinkWriter<SeaTunnelRow, Void> {
+
+    private final SeaTunnelRowType rowType;
+
+    private final SerializationSchema serializationSchema;
+
+    private MongoClient client;
+
+    private final String database;
+
+    private final String collection;
+
+    private final MongoCollection<Document> mongoCollection;
+
+    public MongodbSinkWriter(SeaTunnelRowType rowType, MongodbParameters 
params) {
+        this.rowType = rowType;
+        this.database = params.getDatabase();
+        this.collection = params.getCollection();
+        this.client = MongoClients.create(params.getUri());
+        this.mongoCollection = 
this.client.getDatabase(database).getCollection(collection);
+        this.serializationSchema = new JsonSerializationSchema(rowType);
+    }
+
+    @Override
+    public void write(SeaTunnelRow rows) throws IOException {
+        byte[] serialize = serializationSchema.serialize(rows);
+        String content = new String(serialize);
+
+        Document doc = Document.parse(content);
+        mongoCollection.insertOne(doc);
+    }
+
+    @Override
+    public void close() throws IOException {
+        if (client != null) {
+            client.close();
+        }
+    }
+}
diff --git 
a/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-mongodb-flink-e2e/src/test/java/org/apache/seatunnel/e2e/flink/v2/mongodb/FakeSourceToMongodbIT.java
 
b/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-mongodb-flink-e2e/src/test/java/org/apache/seatunnel/e2e/flink/v2/mongodb/FakeSourceToMongodbIT.java
new file mode 100644
index 000000000..b052ee00f
--- /dev/null
+++ 
b/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-mongodb-flink-e2e/src/test/java/org/apache/seatunnel/e2e/flink/v2/mongodb/FakeSourceToMongodbIT.java
@@ -0,0 +1,137 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.e2e.flink.v2.mongodb;
+
+import static java.net.HttpURLConnection.HTTP_OK;
+import static java.net.HttpURLConnection.HTTP_UNAUTHORIZED;
+
+import org.apache.seatunnel.e2e.flink.FlinkContainer;
+
+import com.google.common.collect.Lists;
+import com.mongodb.client.MongoClient;
+import com.mongodb.client.MongoClients;
+import com.mongodb.client.MongoCursor;
+import lombok.extern.slf4j.Slf4j;
+import org.bson.Document;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.testcontainers.containers.Container;
+import org.testcontainers.containers.GenericContainer;
+import org.testcontainers.containers.output.Slf4jLogConsumer;
+import org.testcontainers.containers.wait.strategy.HttpWaitStrategy;
+import org.testcontainers.lifecycle.Startables;
+import org.testcontainers.shaded.org.awaitility.Awaitility;
+import org.testcontainers.utility.DockerImageName;
+
+import java.io.IOException;
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Stream;
+
+@Slf4j
+public class FakeSourceToMongodbIT extends FlinkContainer {
+
+    private static final String MONGODB_IMAGE = "mongo:latest";
+
+    private static final String MONGODB_CONTAINER_HOST = 
"flink_e2e_mongodb_sink";
+
+    private static final String MONGODB_HOST = "localhost";
+
+    private static final int MONGODB_PORT = 27017;
+
+    private static final String MONGODB_DATABASE = "test_db";
+
+    private static final String MONGODB_COLLECTION = "test_table";
+
+    private static final String MONGODB_URI = 
String.format("mongodb://%s:%d/%s", MONGODB_HOST, MONGODB_PORT, 
MONGODB_DATABASE);
+
+    private MongoClient client;
+
+    private GenericContainer<?> mongodbContainer;
+
+    @BeforeEach
+    public void startMongoContainer() {
+        DockerImageName imageName = DockerImageName.parse(MONGODB_IMAGE);
+        mongodbContainer = new GenericContainer<>(imageName)
+            .withNetwork(NETWORK)
+            .withNetworkAliases(MONGODB_CONTAINER_HOST)
+            .withExposedPorts(MONGODB_PORT)
+            .waitingFor(new HttpWaitStrategy()
+                .forPort(MONGODB_PORT)
+                .forStatusCodeMatching(response -> response == HTTP_OK || 
response == HTTP_UNAUTHORIZED)
+                .withStartupTimeout(Duration.ofMinutes(2)))
+            .withLogConsumer(new Slf4jLogConsumer(log));
+        
mongodbContainer.setPortBindings(Lists.newArrayList(String.format("%s:%s", 
MONGODB_PORT, MONGODB_PORT)));
+        Startables.deepStart(Stream.of(mongodbContainer)).join();
+        log.info("Mongodb container started");
+        Awaitility.given().ignoreExceptions()
+            .await()
+            .atMost(30, TimeUnit.SECONDS)
+            .untilAsserted(this::initConnection);
+    }
+
+    public void initConnection() {
+        client = MongoClients.create(MONGODB_URI);
+    }
+
+    @Test
+    public void testMongodbSink() throws IOException, InterruptedException {
+        Container.ExecResult execResult = 
executeSeaTunnelFlinkJob("/mongodb/fake_to_mongodb.conf");
+        Assertions.assertEquals(0, execResult.getExitCode());
+
+        List<Map<String, Object>> list = new ArrayList<>();
+        try (MongoCursor<Document> mongoCursor = 
client.getDatabase(MONGODB_DATABASE)
+            .getCollection(MONGODB_COLLECTION)
+            .find()
+            .iterator()
+        ) {
+            while (mongoCursor.hasNext()) {
+                Document doc = mongoCursor.next();
+                HashMap<String, Object> map = new HashMap<>(doc.size());
+                Set<Map.Entry<String, Object>> entries = doc.entrySet();
+                for (Map.Entry<String, Object> entry : entries) {
+                    String key = entry.getKey();
+                    Object value = entry.getValue();
+                    map.put(key, value);
+                }
+                log.info("Document ===>>>: " + map);
+                list.add(map);
+            }
+        }
+
+        Assertions.assertEquals(10, list.size());
+    }
+
+    @AfterEach
+    public void close() {
+        super.close();
+        if (client != null) {
+            client.close();
+        }
+        if (mongodbContainer != null) {
+            mongodbContainer.close();
+        }
+    }
+}
diff --git 
a/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-mongodb-flink-e2e/src/test/resources/mongodb/fake_to_mongodb.conf
 
b/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-mongodb-flink-e2e/src/test/resources/mongodb/fake_to_mongodb.conf
new file mode 100644
index 000000000..482f92ee7
--- /dev/null
+++ 
b/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-mongodb-flink-e2e/src/test/resources/mongodb/fake_to_mongodb.conf
@@ -0,0 +1,74 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+######
+###### This config file is a demonstration of streaming processing in 
seatunnel config
+######
+
+env {
+  # You can set flink configuration here
+  execution.parallelism = 1
+  #job.mode = "BATCH"
+  #job.mode = "STREAMING"
+  #execution.checkpoint.interval = 10000
+  #execution.checkpoint.data-uri = "hdfs://localhost:9000/checkpoint"
+}
+
+source {
+  # This is a example source plugin **only for test and demonstrate the 
feature source plugin**
+  FakeSource {
+    result_table_name = "fake"
+    schema = {
+      fields {
+        c_map = "map<string, string>"
+        c_array = "array<tinyint>"
+        c_string = string
+        c_boolean = boolean
+        c_tinyint = tinyint
+        c_smallint = smallint
+        c_int = int
+        c_bigint = bigint
+        c_float = float
+        c_double = double
+        c_decimal = "decimal(30, 8)"
+        c_bytes = bytes
+        c_date = date
+        c_time = time
+        c_timestamp = timestamp
+      }
+    }
+  }
+
+  # If you would like to get more information about how to configure seatunnel 
and see full list of source plugins,
+  # please go to https://seatunnel.apache.org/docs/connector-v2/source/Fake
+}
+
+transform {
+  # If you would like to get more information about how to configure seatunnel 
and see full list of transform plugins,
+  # please go to https://seatunnel.apache.org/docs/connector-v2/transform/Sql
+}
+
+sink {
+  MongoDB {
+    uri = 
"mongodb://flink_e2e_mongodb_sink:27017/test_db?retryWrites=true&writeConcern=majority"
+    database = "test_db"
+    collection = "test_table"
+  }
+
+  # If you would like to get more information about how to configure seatunnel 
and see full list of sink plugins,
+  # please go to https://seatunnel.apache.org/docs/connector-v2/sink/MongoDB
+}
\ No newline at end of file
diff --git a/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/pom.xml 
b/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-mongodb-spark-e2e/pom.xml
similarity index 63%
copy from seatunnel-e2e/seatunnel-spark-connector-v2-e2e/pom.xml
copy to 
seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-mongodb-spark-e2e/pom.xml
index 898bf04c9..f10c0de4d 100644
--- a/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/pom.xml
+++ 
b/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-mongodb-spark-e2e/pom.xml
@@ -1,54 +1,55 @@
 <?xml version="1.0" encoding="UTF-8"?>
 <!--
+
     Licensed to the Apache Software Foundation (ASF) under one or more
     contributor license agreements.  See the NOTICE file distributed with
     this work for additional information regarding copyright ownership.
     The ASF licenses this file to You under the Apache License, Version 2.0
     (the "License"); you may not use this file except in compliance with
     the License.  You may obtain a copy of the License at
+
        http://www.apache.org/licenses/LICENSE-2.0
+
     Unless required by applicable law or agreed to in writing, software
     distributed under the License is distributed on an "AS IS" BASIS,
     WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
     See the License for the specific language governing permissions and
     limitations under the License.
+
 -->
-<project xmlns="http://maven.apache.org/POM/4.0.0";
-         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
+<project xmlns="http://maven.apache.org/POM/4.0.0"; 
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
          xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd";>
+    <modelVersion>4.0.0</modelVersion>
     <parent>
         <groupId>org.apache.seatunnel</groupId>
-        <artifactId>seatunnel-e2e</artifactId>
+        <artifactId>seatunnel-spark-connector-v2-e2e</artifactId>
         <version>${revision}</version>
     </parent>
-    <modelVersion>4.0.0</modelVersion>
-    <packaging>pom</packaging>
 
-    <artifactId>seatunnel-spark-connector-v2-e2e</artifactId>
-
-    <modules>
-        <module>connector-spark-e2e-base</module>
-        <module>connector-datahub-spark-e2e</module>
-        <module>connector-fake-spark-e2e</module>
-        <module>connector-file-spark-e2e</module>
-        <module>connector-iotdb-spark-e2e</module>
-        <module>connector-jdbc-spark-e2e</module>
-        <module>connector-redis-spark-e2e</module>
-    </modules>
+    <artifactId>connector-mongodb-spark-e2e</artifactId>
 
     <dependencies>
         <dependency>
             <groupId>org.apache.seatunnel</groupId>
-            <artifactId>seatunnel-spark-starter</artifactId>
+            <artifactId>connector-spark-e2e-base</artifactId>
             <version>${project.version}</version>
+            <classifier>tests</classifier>
+            <type>test-jar</type>
             <scope>test</scope>
         </dependency>
 
+        <!-- SeaTunnel connectors -->
         <dependency>
-            <groupId>org.awaitility</groupId>
-            <artifactId>awaitility</artifactId>
+            <groupId>org.apache.seatunnel</groupId>
+            <artifactId>connector-fake</artifactId>
+            <version>${project.version}</version>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.seatunnel</groupId>
+            <artifactId>connector-mongodb</artifactId>
+            <version>${project.version}</version>
             <scope>test</scope>
         </dependency>
     </dependencies>
-
-</project>
\ No newline at end of file
+</project>
diff --git 
a/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-mongodb-spark-e2e/src/test/java/org/apache/seatunnel/e2e/spark/v2/mongodb/FakeSourceToMongodbIT.java
 
b/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-mongodb-spark-e2e/src/test/java/org/apache/seatunnel/e2e/spark/v2/mongodb/FakeSourceToMongodbIT.java
new file mode 100644
index 000000000..a26307ca0
--- /dev/null
+++ 
b/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-mongodb-spark-e2e/src/test/java/org/apache/seatunnel/e2e/spark/v2/mongodb/FakeSourceToMongodbIT.java
@@ -0,0 +1,137 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.e2e.spark.v2.mongodb;
+
+import static java.net.HttpURLConnection.HTTP_OK;
+import static java.net.HttpURLConnection.HTTP_UNAUTHORIZED;
+
+import org.apache.seatunnel.e2e.spark.SparkContainer;
+
+import com.google.common.collect.Lists;
+import com.mongodb.client.MongoClient;
+import com.mongodb.client.MongoClients;
+import com.mongodb.client.MongoCursor;
+import lombok.extern.slf4j.Slf4j;
+import org.bson.Document;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.testcontainers.containers.Container;
+import org.testcontainers.containers.GenericContainer;
+import org.testcontainers.containers.output.Slf4jLogConsumer;
+import org.testcontainers.containers.wait.strategy.HttpWaitStrategy;
+import org.testcontainers.lifecycle.Startables;
+import org.testcontainers.shaded.org.awaitility.Awaitility;
+import org.testcontainers.utility.DockerImageName;
+
+import java.io.IOException;
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Stream;
+
+@Slf4j
+public class FakeSourceToMongodbIT extends SparkContainer {
+
+    private static final String MONGODB_IMAGE = "mongo:latest";
+
+    private static final String MONGODB_CONTAINER_HOST = 
"spark_e2e_mongodb_sink";
+
+    private static final String MONGODB_HOST = "localhost";
+
+    private static final int MONGODB_PORT = 27017;
+
+    private static final String MONGODB_DATABASE = "test_db";
+
+    private static final String MONGODB_COLLECTION = "test_table";
+
+    private static final String MONGODB_URI = 
String.format("mongodb://%s:%d/%s", MONGODB_HOST, MONGODB_PORT, 
MONGODB_DATABASE);
+
+    private MongoClient client;
+
+    private GenericContainer<?> mongodbContainer;
+
+    @BeforeEach
+    public void startMongoContainer() {
+        DockerImageName imageName = DockerImageName.parse(MONGODB_IMAGE);
+        mongodbContainer = new GenericContainer<>(imageName)
+            .withNetwork(NETWORK)
+            .withNetworkAliases(MONGODB_CONTAINER_HOST)
+            .withExposedPorts(MONGODB_PORT)
+            .waitingFor(new HttpWaitStrategy()
+                .forPort(MONGODB_PORT)
+                .forStatusCodeMatching(response -> response == HTTP_OK || 
response == HTTP_UNAUTHORIZED)
+                .withStartupTimeout(Duration.ofMinutes(2)))
+            .withLogConsumer(new Slf4jLogConsumer(log));
+        
mongodbContainer.setPortBindings(Lists.newArrayList(String.format("%s:%s", 
MONGODB_PORT, MONGODB_PORT)));
+        Startables.deepStart(Stream.of(mongodbContainer)).join();
+        log.info("Mongodb container started");
+        Awaitility.given().ignoreExceptions()
+            .await()
+            .atMost(30, TimeUnit.SECONDS)
+            .untilAsserted(this::initConnection);
+    }
+
+    public void initConnection() {
+        client = MongoClients.create(MONGODB_URI);
+    }
+
+    @Test
+    public void testMongodbSink() throws IOException, InterruptedException {
+        Container.ExecResult execResult = 
executeSeaTunnelSparkJob("/mongodb/fake_to_mongodb.conf");
+        Assertions.assertEquals(0, execResult.getExitCode());
+
+        List<Map<String, Object>> list = new ArrayList<>();
+        try (MongoCursor<Document> mongoCursor = 
client.getDatabase(MONGODB_DATABASE)
+            .getCollection(MONGODB_COLLECTION)
+            .find()
+            .iterator()
+        ) {
+            while (mongoCursor.hasNext()) {
+                Document doc = mongoCursor.next();
+                HashMap<String, Object> map = new HashMap<>(doc.size());
+                Set<Map.Entry<String, Object>> entries = doc.entrySet();
+                for (Map.Entry<String, Object> entry : entries) {
+                    String key = entry.getKey();
+                    Object value = entry.getValue();
+                    map.put(key, value);
+                }
+                log.info("Document ===>>>: " + map);
+                list.add(map);
+            }
+        }
+
+        Assertions.assertEquals(10, list.size());
+    }
+
+    @AfterEach
+    public void close() {
+        super.close();
+        if (client != null) {
+            client.close();
+        }
+        if (mongodbContainer != null) {
+            mongodbContainer.close();
+        }
+    }
+}
diff --git 
a/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-mongodb-spark-e2e/src/test/resources/log4j.properties
 
b/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-mongodb-spark-e2e/src/test/resources/log4j.properties
new file mode 100644
index 000000000..89ed3ad31
--- /dev/null
+++ 
b/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-mongodb-spark-e2e/src/test/resources/log4j.properties
@@ -0,0 +1,23 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+# Set everything to be logged to the console
+log4j.rootCategory=INFO, console
+log4j.appender.console=org.apache.log4j.ConsoleAppender
+log4j.appender.console.target=System.err
+log4j.appender.console.layout=org.apache.log4j.PatternLayout
+log4j.appender.console.layout.ConversionPattern=%d{yy/MM/dd HH:mm:ss} %p 
%c{1}: %m%n
diff --git 
a/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-mongodb-spark-e2e/src/test/resources/mongodb/fake_to_mongodb.conf
 
b/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-mongodb-spark-e2e/src/test/resources/mongodb/fake_to_mongodb.conf
new file mode 100644
index 000000000..0c978615e
--- /dev/null
+++ 
b/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-mongodb-spark-e2e/src/test/resources/mongodb/fake_to_mongodb.conf
@@ -0,0 +1,70 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+######
+###### This config file is a demonstration of streaming processing in 
seatunnel config
+######
+
+env {
+  # You can set spark configuration here
+  spark.app.name = "SeaTunnel"
+  spark.executor.instances = 2
+  spark.executor.cores = 1
+  spark.executor.memory = "1g"
+  spark.master = local
+  job.mode = "BATCH"
+}
+
+source {
+  # This is a example source plugin **only for test and demonstrate the 
feature source plugin**
+  FakeSource {
+    result_table_name = "fake"
+    schema = {
+      fields {
+        c_string = string
+        c_boolean = boolean
+        c_tinyint = tinyint
+        c_smallint = smallint
+        c_int = int
+        c_bigint = bigint
+        c_float = float
+        c_double = double
+        c_decimal = "decimal(30, 8)"
+        c_bytes = bytes
+      }
+    }
+  }
+
+  # If you would like to get more information about how to configure seatunnel 
and see full list of source plugins,
+  # please go to https://seatunnel.apache.org/docs/connector-v2/source/Fake
+}
+
+transform {
+  # If you would like to get more information about how to configure seatunnel 
and see full list of transform plugins,
+  # please go to https://seatunnel.apache.org/docs/connector-v2/transform/Sql
+}
+
+sink {
+  MongoDB {
+    uri = 
"mongodb://spark_e2e_mongodb_sink:27017/test_db?retryWrites=true&writeConcern=majority"
+    database = "test_db"
+    collection = "test_table"
+  }
+
+  # If you would like to get more information about how to configure seatunnel 
and see full list of sink plugins,
+  # please go to https://seatunnel.apache.org/docs/connector-v2/sink/MongoDB
+}
\ No newline at end of file
diff --git a/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/pom.xml 
b/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/pom.xml
index 898bf04c9..c1fd6c512 100644
--- a/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/pom.xml
+++ b/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/pom.xml
@@ -34,6 +34,7 @@
         <module>connector-iotdb-spark-e2e</module>
         <module>connector-jdbc-spark-e2e</module>
         <module>connector-redis-spark-e2e</module>
+        <module>connector-mongodb-spark-e2e</module>
     </modules>
 
     <dependencies>

Reply via email to