This is an automated email from the ASF dual-hosted git repository. chesnay pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/flink-connector-mongodb.git
The following commit(s) were added to refs/heads/main by this push: new 93534df [hotfix] Fix unstable test of MongoSinkITCase.testRecovery 93534df is described below commit 93534df2d018504d52240068d398708ba008c2f6 Author: Jiabao Sun <jiabao....@xtransfer.cn> AuthorDate: Fri Mar 31 19:28:08 2023 +0800 [hotfix] Fix unstable test of MongoSinkITCase.testRecovery --- .../flink/connector/mongodb/sink/MongoSinkITCase.java | 15 ++++++++++----- 1 file changed, 10 insertions(+), 5 deletions(-) diff --git a/flink-connector-mongodb/src/test/java/org/apache/flink/connector/mongodb/sink/MongoSinkITCase.java b/flink-connector-mongodb/src/test/java/org/apache/flink/connector/mongodb/sink/MongoSinkITCase.java index 05371d4..3cd5e71 100644 --- a/flink-connector-mongodb/src/test/java/org/apache/flink/connector/mongodb/sink/MongoSinkITCase.java +++ b/flink-connector-mongodb/src/test/java/org/apache/flink/connector/mongodb/sink/MongoSinkITCase.java @@ -33,7 +33,8 @@ import org.apache.flink.testutils.junit.SharedReference; import com.mongodb.client.MongoClient; import com.mongodb.client.MongoClients; import com.mongodb.client.MongoCollection; -import com.mongodb.client.model.InsertOneModel; +import com.mongodb.client.model.UpdateOneModel; +import com.mongodb.client.model.UpdateOptions; import com.mongodb.client.model.WriteModel; import org.bson.BsonDocument; import org.bson.Document; @@ -135,7 +136,7 @@ public class MongoSinkITCase { .setCollection(collection) .setBatchSize(5) .setDeliveryGuarantee(deliveryGuarantee) - .setSerializationSchema(new AppendOnlySerializationSchema()) + .setSerializationSchema(new UpsertSerializationSchema()) .build(); } @@ -154,11 +155,15 @@ public class MongoSinkITCase { } } - private static class AppendOnlySerializationSchema - implements MongoSerializationSchema<Document> { + private static class UpsertSerializationSchema implements MongoSerializationSchema<Document> { @Override public WriteModel<BsonDocument> serialize(Document element, MongoSinkContext sinkContext) { - return new InsertOneModel<>(element.toBsonDocument()); + BsonDocument document = element.toBsonDocument(); + BsonDocument filter = new BsonDocument("_id", document.get("_id")); + // _id is immutable so we remove it here to prevent exception. + document.remove("_id"); + BsonDocument update = new BsonDocument("$set", document); + return new UpdateOneModel<>(filter, update, new UpdateOptions().upsert(true)); } }