This is an automated email from the ASF dual-hosted git repository.

fanjia pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new 8c569b1541 [Feature][CDC] Support MongoDB CDC running on flink (#5644)
8c569b1541 is described below

commit 8c569b1541e2c2358cfb3357b8814bfc0415b6ec
Author: Carl-Zhou-CN <[email protected]>
AuthorDate: Tue Oct 17 18:25:39 2023 +0800

    [Feature][CDC] Support MongoDB CDC running on flink (#5644)
    
    
    
    ---------
    
    Co-authored-by: zhouyao <[email protected]>
---
 docs/en/connector-v2/source/MongoDB-CDC.md         |  1 +
 .../MongoDBConnectorDeserializationSchema.java     | 11 ++------
 .../src/test/java/mongodb/MongodbCDCIT.java        | 30 ++++++++++++++++++++--
 .../src/test/resources/ddl/inventoryClean.js       | 16 ++++++++++++
 4 files changed, 47 insertions(+), 11 deletions(-)

diff --git a/docs/en/connector-v2/source/MongoDB-CDC.md 
b/docs/en/connector-v2/source/MongoDB-CDC.md
index d78f70110f..17fe09e3b2 100644
--- a/docs/en/connector-v2/source/MongoDB-CDC.md
+++ b/docs/en/connector-v2/source/MongoDB-CDC.md
@@ -5,6 +5,7 @@
 ## Support Those Engines
 
 > SeaTunnel Zeta<br/>
+> Flink<br/>
 
 ## Key Features
 
diff --git 
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-mongodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mongodb/sender/MongoDBConnectorDeserializationSchema.java
 
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-mongodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mongodb/sender/MongoDBConnectorDeserializationSchema.java
index 6f36f4be83..4df666d2ad 100644
--- 
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-mongodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mongodb/sender/MongoDBConnectorDeserializationSchema.java
+++ 
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-mongodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mongodb/sender/MongoDBConnectorDeserializationSchema.java
@@ -64,7 +64,6 @@ import static 
org.apache.seatunnel.connectors.seatunnel.cdc.mongodb.config.Mongo
 import static 
org.apache.seatunnel.connectors.seatunnel.cdc.mongodb.config.MongodbSourceOptions.DOCUMENT_KEY;
 import static 
org.apache.seatunnel.connectors.seatunnel.cdc.mongodb.config.MongodbSourceOptions.ENCODE_VALUE_FIELD;
 import static 
org.apache.seatunnel.connectors.seatunnel.cdc.mongodb.config.MongodbSourceOptions.FULL_DOCUMENT;
-import static 
org.apache.seatunnel.connectors.seatunnel.cdc.mongodb.config.MongodbSourceOptions.ID_FIELD;
 import static 
org.apache.seatunnel.connectors.seatunnel.cdc.mongodb.utils.MongodbRecordUtils.extractBsonDocument;
 import static 
org.apache.seatunnel.shade.com.google.common.base.Preconditions.checkNotNull;
 
@@ -101,11 +100,7 @@ public class MongoDBConnectorDeserializationSchema
                 emit(record, insert, out);
                 break;
             case DELETE:
-                SeaTunnelRow delete =
-                        new SeaTunnelRow(
-                                new Object[] {
-                                    
documentKey.get(ID_FIELD).asObjectId().getValue().toString()
-                                });
+                SeaTunnelRow delete = extractRowData(documentKey);
                 delete.setRowKind(RowKind.DELETE);
                 emit(record, delete, out);
                 break;
@@ -190,9 +185,7 @@ public class MongoDBConnectorDeserializationSchema
             @Override
             public Object apply(BsonValue bsonValue) {
                 if (isBsonValueNull(bsonValue) || isBsonDecimalNaN(bsonValue)) 
{
-                    throw new MongodbConnectorException(
-                            UNSUPPORTED_OPERATION,
-                            "Unable to convert to <" + type + "> from nullable 
value " + bsonValue);
+                    return null;
                 }
                 return internalConverter.apply(bsonValue);
             }
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-mongodb-e2e/src/test/java/mongodb/MongodbCDCIT.java
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-mongodb-e2e/src/test/java/mongodb/MongodbCDCIT.java
index c01b36ef18..01a4c0a0f5 100644
--- 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-mongodb-e2e/src/test/java/mongodb/MongodbCDCIT.java
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-mongodb-e2e/src/test/java/mongodb/MongodbCDCIT.java
@@ -65,8 +65,8 @@ import static org.awaitility.Awaitility.await;
 @Slf4j
 @DisabledOnContainer(
         value = {},
-        type = {EngineType.SPARK, EngineType.FLINK},
-        disabledReason = "Currently SPARK and FLINK do not support cdc")
+        type = {EngineType.SPARK},
+        disabledReason = "Currently SPARK do not support cdc")
 public class MongodbCDCIT extends TestSuiteBase implements TestResource {
 
     // 
----------------------------------------------------------------------------
@@ -182,6 +182,28 @@ public class MongodbCDCIT extends TestSuiteBase implements 
TestResource {
         // insert update delete
         upsertDeleteSourceTable();
 
+        await().atMost(60000, TimeUnit.MILLISECONDS)
+                .untilAsserted(
+                        () -> {
+                            Assertions.assertIterableEquals(
+                                    readMongodbData().stream()
+                                            .peek(e -> e.remove("_id"))
+                                            .map(Document::entrySet)
+                                            .map(Set::stream)
+                                            .map(
+                                                    entryStream ->
+                                                            entryStream
+                                                                    
.map(Map.Entry::getValue)
+                                                                    .collect(
+                                                                            
Collectors.toCollection(
+                                                                               
     ArrayList
+                                                                               
             ::new)))
+                                            .collect(Collectors.toList()),
+                                    querySql());
+                        });
+
+        cleanSourceTable();
+
         await().atMost(60000, TimeUnit.MILLISECONDS)
                 .untilAsserted(
                         () -> {
@@ -233,6 +255,10 @@ public class MongodbCDCIT extends TestSuiteBase implements 
TestResource {
         mongodbContainer.executeCommandFileInDatabase("inventoryDDL", 
MONGODB_DATABASE);
     }
 
+    private void cleanSourceTable() {
+        mongodbContainer.executeCommandFileInDatabase("inventoryClean", 
MONGODB_DATABASE);
+    }
+
     public void initConnection() {
         String ipAddress = mongodbContainer.getHost();
         Integer port = mongodbContainer.getFirstMappedPort();
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-mongodb-e2e/src/test/resources/ddl/inventoryClean.js
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-mongodb-e2e/src/test/resources/ddl/inventoryClean.js
new file mode 100644
index 0000000000..fbbb0ea0df
--- /dev/null
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-mongodb-e2e/src/test/resources/ddl/inventoryClean.js
@@ -0,0 +1,16 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+//  -- this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+//  the License.  You may obtain a copy of the License at
+//
+//       http://www.apache.org/licenses/LICENSE-2.0
+//
+//   Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+//   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+db.getCollection('products').deleteMany({})

Reply via email to