This is an automated email from the ASF dual-hosted git repository.

chesnay pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/flink-connector-mongodb.git


The following commit(s) were added to refs/heads/main by this push:
     new 93534df  [hotfix] Fix unstable test of MongoSinkITCase.testRecovery
93534df is described below

commit 93534df2d018504d52240068d398708ba008c2f6
Author: Jiabao Sun <jiabao....@xtransfer.cn>
AuthorDate: Fri Mar 31 19:28:08 2023 +0800

    [hotfix] Fix unstable test of MongoSinkITCase.testRecovery
---
 .../flink/connector/mongodb/sink/MongoSinkITCase.java     | 15 ++++++++++-----
 1 file changed, 10 insertions(+), 5 deletions(-)

diff --git 
a/flink-connector-mongodb/src/test/java/org/apache/flink/connector/mongodb/sink/MongoSinkITCase.java
 
b/flink-connector-mongodb/src/test/java/org/apache/flink/connector/mongodb/sink/MongoSinkITCase.java
index 05371d4..3cd5e71 100644
--- 
a/flink-connector-mongodb/src/test/java/org/apache/flink/connector/mongodb/sink/MongoSinkITCase.java
+++ 
b/flink-connector-mongodb/src/test/java/org/apache/flink/connector/mongodb/sink/MongoSinkITCase.java
@@ -33,7 +33,8 @@ import org.apache.flink.testutils.junit.SharedReference;
 import com.mongodb.client.MongoClient;
 import com.mongodb.client.MongoClients;
 import com.mongodb.client.MongoCollection;
-import com.mongodb.client.model.InsertOneModel;
+import com.mongodb.client.model.UpdateOneModel;
+import com.mongodb.client.model.UpdateOptions;
 import com.mongodb.client.model.WriteModel;
 import org.bson.BsonDocument;
 import org.bson.Document;
@@ -135,7 +136,7 @@ public class MongoSinkITCase {
                 .setCollection(collection)
                 .setBatchSize(5)
                 .setDeliveryGuarantee(deliveryGuarantee)
-                .setSerializationSchema(new AppendOnlySerializationSchema())
+                .setSerializationSchema(new UpsertSerializationSchema())
                 .build();
     }
 
@@ -154,11 +155,15 @@ public class MongoSinkITCase {
         }
     }
 
-    private static class AppendOnlySerializationSchema
-            implements MongoSerializationSchema<Document> {
+    private static class UpsertSerializationSchema implements 
MongoSerializationSchema<Document> {
         @Override
         public WriteModel<BsonDocument> serialize(Document element, 
MongoSinkContext sinkContext) {
-            return new InsertOneModel<>(element.toBsonDocument());
+            BsonDocument document = element.toBsonDocument();
+            BsonDocument filter = new BsonDocument("_id", document.get("_id"));
+            // _id is immutable so we remove it here to prevent exception.
+            document.remove("_id");
+            BsonDocument update = new BsonDocument("$set", document);
+            return new UpdateOneModel<>(filter, update, new 
UpdateOptions().upsert(true));
         }
     }
 

Reply via email to