This is an automated email from the ASF dual-hosted git repository.

martijnvisser pushed a commit to branch v1.0
in repository https://gitbox.apache.org/repos/asf/flink-connector-mongodb.git

commit e2aa8b332552853f2e59f7f46af3cbe9f7573741
Author: Jiabao Sun <jiabao....@xtransfer.cn>
AuthorDate: Mon Jan 29 09:50:00 2024 +0800

    [hotfix][test][connectors/mongodb] Update MongoWriterITCase to be 
compatible with updated SinkV2 interfaces
    
    This closes #22.
---
 .../mongodb/sink/writer/MongoWriterITCase.java     | 33 +++++++++++-----------
 1 file changed, 16 insertions(+), 17 deletions(-)

diff --git 
a/flink-connector-mongodb/src/test/java/org/apache/flink/connector/mongodb/sink/writer/MongoWriterITCase.java
 
b/flink-connector-mongodb/src/test/java/org/apache/flink/connector/mongodb/sink/writer/MongoWriterITCase.java
index 84767e8..bd3ca66 100644
--- 
a/flink-connector-mongodb/src/test/java/org/apache/flink/connector/mongodb/sink/writer/MongoWriterITCase.java
+++ 
b/flink-connector-mongodb/src/test/java/org/apache/flink/connector/mongodb/sink/writer/MongoWriterITCase.java
@@ -17,8 +17,9 @@
 
 package org.apache.flink.connector.mongodb.sink.writer;
 
+import org.apache.flink.connector.base.DeliveryGuarantee;
 import org.apache.flink.connector.base.sink.writer.TestSinkInitContext;
-import org.apache.flink.connector.mongodb.common.config.MongoConnectionOptions;
+import org.apache.flink.connector.mongodb.sink.MongoSink;
 import org.apache.flink.connector.mongodb.sink.config.MongoWriteOptions;
 import org.apache.flink.connector.mongodb.sink.writer.context.MongoSinkContext;
 import 
org.apache.flink.connector.mongodb.sink.writer.serializer.MongoSerializationSchema;
@@ -49,6 +50,7 @@ import org.testcontainers.containers.MongoDBContainer;
 import org.testcontainers.junit.jupiter.Container;
 import org.testcontainers.junit.jupiter.Testcontainers;
 
+import java.io.IOException;
 import java.util.Optional;
 
 import static 
org.apache.flink.connector.mongodb.testutils.MongoTestUtil.assertThatIdsAreNotWritten;
@@ -235,7 +237,7 @@ public class MongoWriterITCase {
                 MongoWriteOptions.builder()
                         .setBatchSize(batchSize)
                         .setBatchIntervalMs(batchIntervalMs)
-                        .setMaxRetries(0)
+                        .setDeliveryGuarantee(DeliveryGuarantee.NONE)
                         .build();
 
         MongoSerializationSchema<Document> testSerializationSchema =
@@ -269,7 +271,8 @@ public class MongoWriterITCase {
     }
 
     private static MongoWriter<Document> createWriter(
-            String collection, int batchSize, long batchIntervalMs, boolean 
flushOnCheckpoint) {
+            String collection, int batchSize, long batchIntervalMs, boolean 
flushOnCheckpoint)
+            throws IOException {
         return createWriter(
                 collection,
                 batchSize,
@@ -283,28 +286,24 @@ public class MongoWriterITCase {
             int batchSize,
             long batchIntervalMs,
             boolean flushOnCheckpoint,
-            MongoSerializationSchema<Document> serializationSchema) {
+            MongoSerializationSchema<Document> serializationSchema)
+            throws IOException {
 
-        MongoConnectionOptions connectionOptions =
-                MongoConnectionOptions.builder()
+        MongoSink<Document> mongoSink =
+                MongoSink.<Document>builder()
                         .setUri(MONGO_CONTAINER.getConnectionString())
                         .setDatabase(TEST_DATABASE)
                         .setCollection(collection)
-                        .build();
-
-        MongoWriteOptions writeOptions =
-                MongoWriteOptions.builder()
                         .setBatchSize(batchSize)
                         .setBatchIntervalMs(batchIntervalMs)
-                        .setMaxRetries(0)
+                        .setDeliveryGuarantee(
+                                flushOnCheckpoint
+                                        ? DeliveryGuarantee.AT_LEAST_ONCE
+                                        : DeliveryGuarantee.NONE)
+                        .setSerializationSchema(serializationSchema)
                         .build();
 
-        return new MongoWriter<>(
-                connectionOptions,
-                writeOptions,
-                flushOnCheckpoint,
-                sinkInitContext,
-                serializationSchema);
+        return (MongoWriter<Document>) mongoSink.createWriter(sinkInitContext);
     }
 
     private static Document buildMessage(int id) {

Reply via email to