This is an automated email from the ASF dual-hosted git repository. martijnvisser pushed a commit to branch v1.0 in repository https://gitbox.apache.org/repos/asf/flink-connector-mongodb.git
commit e2aa8b332552853f2e59f7f46af3cbe9f7573741 Author: Jiabao Sun <jiabao....@xtransfer.cn> AuthorDate: Mon Jan 29 09:50:00 2024 +0800 [hotfix][test][connectors/mongodb] Update MongoWriterITCase to be compatible with updated SinkV2 interfaces This closes #22. --- .../mongodb/sink/writer/MongoWriterITCase.java | 33 +++++++++++----------- 1 file changed, 16 insertions(+), 17 deletions(-) diff --git a/flink-connector-mongodb/src/test/java/org/apache/flink/connector/mongodb/sink/writer/MongoWriterITCase.java b/flink-connector-mongodb/src/test/java/org/apache/flink/connector/mongodb/sink/writer/MongoWriterITCase.java index 84767e8..bd3ca66 100644 --- a/flink-connector-mongodb/src/test/java/org/apache/flink/connector/mongodb/sink/writer/MongoWriterITCase.java +++ b/flink-connector-mongodb/src/test/java/org/apache/flink/connector/mongodb/sink/writer/MongoWriterITCase.java @@ -17,8 +17,9 @@ package org.apache.flink.connector.mongodb.sink.writer; +import org.apache.flink.connector.base.DeliveryGuarantee; import org.apache.flink.connector.base.sink.writer.TestSinkInitContext; -import org.apache.flink.connector.mongodb.common.config.MongoConnectionOptions; +import org.apache.flink.connector.mongodb.sink.MongoSink; import org.apache.flink.connector.mongodb.sink.config.MongoWriteOptions; import org.apache.flink.connector.mongodb.sink.writer.context.MongoSinkContext; import org.apache.flink.connector.mongodb.sink.writer.serializer.MongoSerializationSchema; @@ -49,6 +50,7 @@ import org.testcontainers.containers.MongoDBContainer; import org.testcontainers.junit.jupiter.Container; import org.testcontainers.junit.jupiter.Testcontainers; +import java.io.IOException; import java.util.Optional; import static org.apache.flink.connector.mongodb.testutils.MongoTestUtil.assertThatIdsAreNotWritten; @@ -235,7 +237,7 @@ public class MongoWriterITCase { MongoWriteOptions.builder() .setBatchSize(batchSize) .setBatchIntervalMs(batchIntervalMs) - .setMaxRetries(0) + .setDeliveryGuarantee(DeliveryGuarantee.NONE) .build(); MongoSerializationSchema<Document> testSerializationSchema = @@ -269,7 +271,8 @@ public class MongoWriterITCase { } private static MongoWriter<Document> createWriter( - String collection, int batchSize, long batchIntervalMs, boolean flushOnCheckpoint) { + String collection, int batchSize, long batchIntervalMs, boolean flushOnCheckpoint) + throws IOException { return createWriter( collection, batchSize, @@ -283,28 +286,24 @@ public class MongoWriterITCase { int batchSize, long batchIntervalMs, boolean flushOnCheckpoint, - MongoSerializationSchema<Document> serializationSchema) { + MongoSerializationSchema<Document> serializationSchema) + throws IOException { - MongoConnectionOptions connectionOptions = - MongoConnectionOptions.builder() + MongoSink<Document> mongoSink = + MongoSink.<Document>builder() .setUri(MONGO_CONTAINER.getConnectionString()) .setDatabase(TEST_DATABASE) .setCollection(collection) - .build(); - - MongoWriteOptions writeOptions = - MongoWriteOptions.builder() .setBatchSize(batchSize) .setBatchIntervalMs(batchIntervalMs) - .setMaxRetries(0) + .setDeliveryGuarantee( + flushOnCheckpoint + ? DeliveryGuarantee.AT_LEAST_ONCE + : DeliveryGuarantee.NONE) + .setSerializationSchema(serializationSchema) .build(); - return new MongoWriter<>( - connectionOptions, - writeOptions, - flushOnCheckpoint, - sinkInitContext, - serializationSchema); + return (MongoWriter<Document>) mongoSink.createWriter(sinkInitContext); } private static Document buildMessage(int id) {