This is an automated email from the ASF dual-hosted git repository.
fcsaky pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/flink-connector-mongodb.git
The following commit(s) were added to refs/heads/main by this push:
new e0050b4 [FLINK-37950] Supporting ordered writes &
`bypassDocumentValidation` behavior for sink writer
e0050b4 is described below
commit e0050b4a0f9bc6b079a0cbe891339b9e9f612b21
Author: Rahul Teke <[email protected]>
AuthorDate: Tue Feb 10 05:16:17 2026 +0000
[FLINK-37950] Supporting ordered writes & `bypassDocumentValidation`
behavior for sink writer
---
docs/content/docs/connectors/datastream/mongodb.md | 8 +-
docs/content/docs/connectors/table/mongodb.md | 18 +++-
.../connector/mongodb/sink/MongoSinkBuilder.java | 22 ++++
.../mongodb/sink/config/MongoWriteOptions.java | 63 +++++++++++-
.../connector/mongodb/sink/writer/MongoWriter.java | 8 +-
.../mongodb/table/MongoConnectorOptions.java | 12 +++
.../connector/mongodb/sink/MongoSinkITCase.java | 114 +++++++++++++++++++--
.../mongodb/sink/writer/MongoWriterITCase.java | 8 +-
.../connector/mongodb/testutils/MongoTestUtil.java | 28 ++++-
9 files changed, 263 insertions(+), 18 deletions(-)
diff --git a/docs/content/docs/connectors/datastream/mongodb.md
b/docs/content/docs/connectors/datastream/mongodb.md
index b59d605..3e195ab 100644
--- a/docs/content/docs/connectors/datastream/mongodb.md
+++ b/docs/content/docs/connectors/datastream/mongodb.md
@@ -204,7 +204,13 @@ Flink's MongoDB sink is created by using the static
builder `MongoSink.<InputTyp
7. _setDeliveryGuarantee(DeliveryGuarantee deliveryGuarantee)_
* Optional. Default: `DeliveryGuarantee.AT_LEAST_ONCE`.
* Sets the wanted `DeliveryGuarantee`. The `EXACTLY_ONCE` guarantee is not
supported yet.
-8. __setSerializationSchema(MongoSerializationSchema<InputType>
serializationSchema)__
+8. _setOrderedWrites(boolean ordered)_
+ * Optional. Default: `true`
+ * Defines MongoDB driver option to perform ordered writes.
+8. _setBypassDocumentValidation(boolean bypassDocumentValidation)_
+ * Optional. Default: `false`
+ * Defines MongoDB driver option to bypass document validation.
+9. __setSerializationSchema(MongoSerializationSchema<InputType>
serializationSchema)__
* Required.
* A `MongoSerializationSchema` is required for parsing input record to
MongoDB
[WriteModel](https://www.mongodb.com/docs/drivers/java/sync/current/usage-examples/bulkWrite/).
diff --git a/docs/content/docs/connectors/table/mongodb.md
b/docs/content/docs/connectors/table/mongodb.md
index 29b90d9..0d4f065 100644
--- a/docs/content/docs/connectors/table/mongodb.md
+++ b/docs/content/docs/connectors/table/mongodb.md
@@ -297,7 +297,23 @@ Connector Options
<td style="word-wrap: break-word;">at-lease-once</td>
<td><p>Enum</p>Possible values: none, at-least-once</td>
<td>Optional delivery guarantee when committing. The exactly-once
guarantee is not supported yet.</td>
- </tr>
+ </tr>
+ <tr>
+ <td><h5>sink.ordered-writes</h5></td>
+ <td>optional</td>
+ <td>no</td>
+ <td style="word-wrap: break-word;">true</td>
+ <td>Boolean</td>
+ <td>Defines MongoDB driver option to perform ordered writes. By default,
this is true indicating ordered writes.</td>
+ </tr>
+ <tr>
+ <td><h5>sink.bypass-document-validation</h5></td>
+ <td>optional</td>
+ <td>no</td>
+ <td style="word-wrap: break-word;">false</td>
+ <td>Boolean</td>
+ <td>Defines MongoDB driver option to bypass document validation. By
default, this is false indicating validation of documents.</td>
+ </tr>
</tbody>
</table>
diff --git
a/flink-connector-mongodb/src/main/java/org/apache/flink/connector/mongodb/sink/MongoSinkBuilder.java
b/flink-connector-mongodb/src/main/java/org/apache/flink/connector/mongodb/sink/MongoSinkBuilder.java
index 49aa35b..7199b2a 100644
---
a/flink-connector-mongodb/src/main/java/org/apache/flink/connector/mongodb/sink/MongoSinkBuilder.java
+++
b/flink-connector-mongodb/src/main/java/org/apache/flink/connector/mongodb/sink/MongoSinkBuilder.java
@@ -126,6 +126,28 @@ public class MongoSinkBuilder<IN> {
return this;
}
+ /**
+ * Set the ordered write {@link com.mongodb.client.model.BulkWriteOptions}.
+ *
+ * @param ordered describes the write behaviour
+ * @return this builder
+ */
+ public MongoSinkBuilder<IN> setOrderedWrites(boolean ordered) {
+ writeOptionsBuilder.setOrderedWrites(ordered);
+ return this;
+ }
+
+ /**
+ * Set the bypass document validation {@link
com.mongodb.client.model.BulkWriteOptions}.
+ *
+ * @param bypassDocumentValidation describes document validation behaviour
+ * @return this builder
+ */
+ public MongoSinkBuilder<IN> setBypassDocumentValidation(boolean
bypassDocumentValidation) {
+
writeOptionsBuilder.setBypassDocumentValidation(bypassDocumentValidation);
+ return this;
+ }
+
/**
* Sets the serialization schema which is invoked on every record to
convert it to MongoDB bulk
* request.
diff --git
a/flink-connector-mongodb/src/main/java/org/apache/flink/connector/mongodb/sink/config/MongoWriteOptions.java
b/flink-connector-mongodb/src/main/java/org/apache/flink/connector/mongodb/sink/config/MongoWriteOptions.java
index 15f4293..576d199 100644
---
a/flink-connector-mongodb/src/main/java/org/apache/flink/connector/mongodb/sink/config/MongoWriteOptions.java
+++
b/flink-connector-mongodb/src/main/java/org/apache/flink/connector/mongodb/sink/config/MongoWriteOptions.java
@@ -26,7 +26,9 @@ import java.util.Objects;
import static
org.apache.flink.connector.mongodb.table.MongoConnectorOptions.BUFFER_FLUSH_INTERVAL;
import static
org.apache.flink.connector.mongodb.table.MongoConnectorOptions.BUFFER_FLUSH_MAX_ROWS;
import static
org.apache.flink.connector.mongodb.table.MongoConnectorOptions.DELIVERY_GUARANTEE;
+import static
org.apache.flink.connector.mongodb.table.MongoConnectorOptions.SINK_BYPASS_VALIDATION;
import static
org.apache.flink.connector.mongodb.table.MongoConnectorOptions.SINK_MAX_RETRIES;
+import static
org.apache.flink.connector.mongodb.table.MongoConnectorOptions.SINK_ORDERED_WRITES;
import static
org.apache.flink.connector.mongodb.table.MongoConnectorOptions.SINK_RETRY_INTERVAL;
import static org.apache.flink.util.Preconditions.checkArgument;
import static org.apache.flink.util.Preconditions.checkNotNull;
@@ -40,6 +42,8 @@ public final class MongoWriteOptions implements Serializable {
private static final long serialVersionUID = 1L;
+ private final boolean orderedWrites;
+ private final boolean bypassDocumentValidation;
private final int batchSize;
private final long batchIntervalMs;
private final int maxRetries;
@@ -47,11 +51,15 @@ public final class MongoWriteOptions implements
Serializable {
private final DeliveryGuarantee deliveryGuarantee;
private MongoWriteOptions(
+ boolean orderedWrites,
+ boolean bypassDocumentValidation,
int batchSize,
long batchIntervalMs,
int maxRetries,
long retryIntervalMs,
DeliveryGuarantee deliveryGuarantee) {
+ this.orderedWrites = orderedWrites;
+ this.bypassDocumentValidation = bypassDocumentValidation;
this.batchSize = batchSize;
this.batchIntervalMs = batchIntervalMs;
this.maxRetries = maxRetries;
@@ -59,6 +67,14 @@ public final class MongoWriteOptions implements Serializable
{
this.deliveryGuarantee = deliveryGuarantee;
}
+ public boolean isOrderedWrites() {
+ return orderedWrites;
+ }
+
+ public boolean isBypassDocumentValidation() {
+ return bypassDocumentValidation;
+ }
+
public int getBatchSize() {
return batchSize;
}
@@ -88,7 +104,9 @@ public final class MongoWriteOptions implements Serializable
{
return false;
}
MongoWriteOptions that = (MongoWriteOptions) o;
- return batchSize == that.batchSize
+ return orderedWrites == that.orderedWrites
+ && bypassDocumentValidation == that.bypassDocumentValidation
+ && batchSize == that.batchSize
&& batchIntervalMs == that.batchIntervalMs
&& maxRetries == that.maxRetries
&& retryIntervalMs == that.retryIntervalMs
@@ -98,7 +116,13 @@ public final class MongoWriteOptions implements
Serializable {
@Override
public int hashCode() {
return Objects.hash(
- batchSize, batchIntervalMs, maxRetries, retryIntervalMs,
deliveryGuarantee);
+ orderedWrites,
+ bypassDocumentValidation,
+ batchSize,
+ batchIntervalMs,
+ maxRetries,
+ retryIntervalMs,
+ deliveryGuarantee);
}
public static MongoWriteOptionsBuilder builder() {
@@ -108,6 +132,8 @@ public final class MongoWriteOptions implements
Serializable {
/** Builder for {@link MongoWriteOptions}. */
@PublicEvolving
public static class MongoWriteOptionsBuilder {
+ private boolean orderedWrites = SINK_ORDERED_WRITES.defaultValue();
+ private boolean bypassDocumentValidation =
SINK_BYPASS_VALIDATION.defaultValue();
private int batchSize = BUFFER_FLUSH_MAX_ROWS.defaultValue();
private long batchIntervalMs =
BUFFER_FLUSH_INTERVAL.defaultValue().toMillis();
private int maxRetries = SINK_MAX_RETRIES.defaultValue();
@@ -116,6 +142,31 @@ public final class MongoWriteOptions implements
Serializable {
private MongoWriteOptionsBuilder() {}
+ /**
+ * Sets the mongodb bulk write option ordered {@link
+ * com.mongodb.client.model.BulkWriteOptions}.
+ *
+ * @param orderedWrites bulk write option
+ * @return this builder
+ */
+ public MongoWriteOptionsBuilder setOrderedWrites(boolean
orderedWrites) {
+ this.orderedWrites = orderedWrites;
+ return this;
+ }
+
+ /**
+ * Sets the mongodb bulk write option bypassDocumentValidation {@link
+ * com.mongodb.client.model.BulkWriteOptions}.
+ *
+ * @param bypassDocumentValidation bulk write option to bypass
document validation
+ * @return this builder
+ */
+ public MongoWriteOptionsBuilder setBypassDocumentValidation(
+ boolean bypassDocumentValidation) {
+ this.bypassDocumentValidation = bypassDocumentValidation;
+ return this;
+ }
+
/**
* Sets the maximum number of actions to buffer for each batch
request. You can pass -1 to
* disable batching.
@@ -195,7 +246,13 @@ public final class MongoWriteOptions implements
Serializable {
*/
public MongoWriteOptions build() {
return new MongoWriteOptions(
- batchSize, batchIntervalMs, maxRetries, retryIntervalMs,
deliveryGuarantee);
+ orderedWrites,
+ bypassDocumentValidation,
+ batchSize,
+ batchIntervalMs,
+ maxRetries,
+ retryIntervalMs,
+ deliveryGuarantee);
}
}
}
diff --git
a/flink-connector-mongodb/src/main/java/org/apache/flink/connector/mongodb/sink/writer/MongoWriter.java
b/flink-connector-mongodb/src/main/java/org/apache/flink/connector/mongodb/sink/writer/MongoWriter.java
index cbd9a61..b806f9e 100644
---
a/flink-connector-mongodb/src/main/java/org/apache/flink/connector/mongodb/sink/writer/MongoWriter.java
+++
b/flink-connector-mongodb/src/main/java/org/apache/flink/connector/mongodb/sink/writer/MongoWriter.java
@@ -38,6 +38,7 @@ import org.apache.flink.util.concurrent.ExecutorThreadFactory;
import com.mongodb.MongoException;
import com.mongodb.client.MongoClient;
import com.mongodb.client.MongoClients;
+import com.mongodb.client.model.BulkWriteOptions;
import com.mongodb.client.model.WriteModel;
import org.bson.BsonDocument;
import org.slf4j.Logger;
@@ -213,7 +214,12 @@ public class MongoWriter<IN> implements SinkWriter<IN> {
mongoClient
.getDatabase(connectionOptions.getDatabase())
.getCollection(connectionOptions.getCollection(),
BsonDocument.class)
- .bulkWrite(bulkRequests);
+ .bulkWrite(
+ bulkRequests,
+ new BulkWriteOptions()
+
.ordered(writeOptions.isOrderedWrites())
+ .bypassDocumentValidation(
+
writeOptions.isBypassDocumentValidation()));
ackTime = System.currentTimeMillis();
bulkRequests.clear();
break;
diff --git
a/flink-connector-mongodb/src/main/java/org/apache/flink/connector/mongodb/table/MongoConnectorOptions.java
b/flink-connector-mongodb/src/main/java/org/apache/flink/connector/mongodb/table/MongoConnectorOptions.java
index 36948f2..b03e70c 100644
---
a/flink-connector-mongodb/src/main/java/org/apache/flink/connector/mongodb/table/MongoConnectorOptions.java
+++
b/flink-connector-mongodb/src/main/java/org/apache/flink/connector/mongodb/table/MongoConnectorOptions.java
@@ -115,6 +115,18 @@ public class MongoConnectorOptions {
.withDescription(
"Specifies the retry time interval if lookup
records from database failed.");
+ public static final ConfigOption<Boolean> SINK_ORDERED_WRITES =
+ ConfigOptions.key("sink.ordered-writes")
+ .booleanType()
+ .defaultValue(true)
+ .withDescription("Specifies mongodb bulk write ordered
option");
+
+ public static final ConfigOption<Boolean> SINK_BYPASS_VALIDATION =
+ ConfigOptions.key("sink.bypass-document-validation")
+ .booleanType()
+ .defaultValue(false)
+ .withDescription("Specifies mongodb bulk write option to
bypass validation");
+
public static final ConfigOption<Integer> BUFFER_FLUSH_MAX_ROWS =
ConfigOptions.key("sink.buffer-flush.max-rows")
.intType()
diff --git
a/flink-connector-mongodb/src/test/java/org/apache/flink/connector/mongodb/sink/MongoSinkITCase.java
b/flink-connector-mongodb/src/test/java/org/apache/flink/connector/mongodb/sink/MongoSinkITCase.java
index d7b4a07..0e1c274 100644
---
a/flink-connector-mongodb/src/test/java/org/apache/flink/connector/mongodb/sink/MongoSinkITCase.java
+++
b/flink-connector-mongodb/src/test/java/org/apache/flink/connector/mongodb/sink/MongoSinkITCase.java
@@ -25,6 +25,7 @@ import org.apache.flink.connector.base.DeliveryGuarantee;
import org.apache.flink.connector.mongodb.sink.writer.context.MongoSinkContext;
import
org.apache.flink.connector.mongodb.sink.writer.serializer.MongoSerializationSchema;
import org.apache.flink.connector.mongodb.testutils.MongoTestUtil;
+import org.apache.flink.runtime.client.JobExecutionException;
import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.test.junit5.MiniClusterExtension;
@@ -34,12 +35,18 @@ import org.apache.flink.testutils.junit.SharedReference;
import com.mongodb.client.MongoClient;
import com.mongodb.client.MongoClients;
import com.mongodb.client.MongoCollection;
+import com.mongodb.client.model.CreateCollectionOptions;
+import com.mongodb.client.model.Filters;
import com.mongodb.client.model.UpdateOneModel;
import com.mongodb.client.model.UpdateOptions;
+import com.mongodb.client.model.ValidationAction;
+import com.mongodb.client.model.ValidationOptions;
import com.mongodb.client.model.WriteModel;
import org.bson.BsonDocument;
+import org.bson.BsonType;
import org.bson.Document;
import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.RegisterExtension;
@@ -54,7 +61,9 @@ import org.testcontainers.junit.jupiter.Testcontainers;
import java.util.concurrent.atomic.AtomicBoolean;
-import static
org.apache.flink.connector.mongodb.testutils.MongoTestUtil.assertThatIdsAreWritten;
+import static
org.apache.flink.connector.mongodb.testutils.MongoTestUtil.assertThatIdsAreNotWritten;
+import static
org.apache.flink.connector.mongodb.testutils.MongoTestUtil.assertThatIdsAreWrittenInAnyOrder;
+import static
org.apache.flink.connector.mongodb.testutils.MongoTestUtil.assertThatIdsAreWrittenInOrder;
import static org.assertj.core.api.Assertions.assertThat;
/** IT cases for {@link MongoSink}. */
@@ -93,6 +102,93 @@ class MongoSinkITCase {
}
}
+ @Test
+ void testOrderedWrite() throws Exception {
+ final String collection = "test-sink-with-ordered-write";
+ final MongoSink<Document> sink =
+ createSink(collection, DeliveryGuarantee.AT_LEAST_ONCE, true,
false);
+ Configuration config = new Configuration();
+ config.set(RestartStrategyOptions.RESTART_STRATEGY, "disable");
+ final StreamExecutionEnvironment env =
+ StreamExecutionEnvironment.getExecutionEnvironment(config);
+ env.enableCheckpointing(100L);
+
+ env.fromSequence(1, 5).map(new TestMapFunction()).sinkTo(sink);
+ env.execute();
+ assertThatIdsAreWrittenInOrder(collectionOf(collection), 1, 2, 3, 4,
5);
+ }
+
+ @Test
+ void testUnorderedWrite() throws Exception {
+ final String collection = "test-sink-with-unordered-write";
+ final MongoSink<Document> sink =
+ createSink(collection, DeliveryGuarantee.AT_LEAST_ONCE, false,
false);
+ Configuration config = new Configuration();
+ config.set(RestartStrategyOptions.RESTART_STRATEGY, "disable");
+ final StreamExecutionEnvironment env =
+ StreamExecutionEnvironment.getExecutionEnvironment(config);
+ env.enableCheckpointing(100L);
+
+ env.fromSequence(1, 5).map(new TestMapFunction()).sinkTo(sink);
+ env.execute();
+ assertThatIdsAreWrittenInAnyOrder(collectionOf(collection), 1, 2, 3,
4, 5);
+ }
+
+ @Test
+ void testDocumentValidation() throws Exception {
+ final String collection = "test-sink-with-doc-validation";
+ // Create collection with validation
+ ValidationOptions validationOptions =
+ new ValidationOptions().validator(Filters.type("_id",
BsonType.INT64));
+ validationOptions.validationAction(ValidationAction.ERROR);
+ CreateCollectionOptions createCollectionOptions = new
CreateCollectionOptions();
+ createCollectionOptions.validationOptions(validationOptions);
+ mongoClient
+ .getDatabase(TEST_DATABASE)
+ .createCollection(collection, createCollectionOptions);
+
+ final MongoSink<Document> sink =
+ createSink(collection, DeliveryGuarantee.AT_LEAST_ONCE, true,
false);
+ Configuration config = new Configuration();
+ config.set(RestartStrategyOptions.RESTART_STRATEGY, "disable");
+ final StreamExecutionEnvironment env =
+ StreamExecutionEnvironment.getExecutionEnvironment(config);
+ env.enableCheckpointing(100L);
+
+ String[] data = new String[] {"1", "2", "3", "4", "5", "A"};
+ env.fromData(data).map(id -> new Document("_id", id).append("f1", "d_"
+ id)).sinkTo(sink);
+ Assertions.assertThrows(JobExecutionException.class, env::execute);
+
+ assertThatIdsAreNotWritten(collectionOf(collection), data);
+ }
+
+ @Test
+ void testBypassDocumentValidation() throws Exception {
+ final String collection = "test-sink-with-bypass-doc-validation";
+ // Create collection with validation
+ ValidationOptions validationOptions =
+ new ValidationOptions().validator(Filters.type("_id",
BsonType.INT64));
+ validationOptions.validationAction(ValidationAction.ERROR);
+ CreateCollectionOptions createCollectionOptions = new
CreateCollectionOptions();
+ createCollectionOptions.validationOptions(validationOptions);
+ mongoClient
+ .getDatabase(TEST_DATABASE)
+ .createCollection(collection, createCollectionOptions);
+
+ final MongoSink<Document> sink =
+ createSink(collection, DeliveryGuarantee.AT_LEAST_ONCE, true,
true);
+ Configuration config = new Configuration();
+ config.set(RestartStrategyOptions.RESTART_STRATEGY, "disable");
+ final StreamExecutionEnvironment env =
+ StreamExecutionEnvironment.getExecutionEnvironment(config);
+ env.enableCheckpointing(100L);
+
+ String[] data = new String[] {"1", "2", "3", "4", "5", "A"};
+ env.fromData(data).map(id -> new Document("_id", id).append("f1", "d_"
+ id)).sinkTo(sink);
+ env.execute();
+ assertThatIdsAreWrittenInAnyOrder(collectionOf(collection), data);
+ }
+
@ParameterizedTest
@EnumSource(
value = DeliveryGuarantee.class,
@@ -101,7 +197,7 @@ class MongoSinkITCase {
void testWriteToMongoWithDeliveryGuarantee(DeliveryGuarantee
deliveryGuarantee)
throws Exception {
final String collection = "test-sink-with-delivery-" +
deliveryGuarantee;
- final MongoSink<Document> sink = createSink(collection,
deliveryGuarantee);
+ final MongoSink<Document> sink = createSink(collection,
deliveryGuarantee, true, false);
Configuration config = new Configuration();
config.set(RestartStrategyOptions.RESTART_STRATEGY, "disable");
final StreamExecutionEnvironment env =
@@ -110,13 +206,14 @@ class MongoSinkITCase {
env.fromSequence(1, 5).map(new TestMapFunction()).sinkTo(sink);
env.execute();
- assertThatIdsAreWritten(collectionOf(collection), 1, 2, 3, 4, 5);
+ assertThatIdsAreWrittenInAnyOrder(collectionOf(collection), 1, 2, 3,
4, 5);
}
@Test
void testRecovery() throws Exception {
final String collection = "test-recovery-mongo-sink";
- final MongoSink<Document> sink = createSink(collection,
DeliveryGuarantee.AT_LEAST_ONCE);
+ final MongoSink<Document> sink =
+ createSink(collection, DeliveryGuarantee.AT_LEAST_ONCE, true,
false);
final StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
env.enableCheckpointing(100L);
@@ -129,18 +226,23 @@ class MongoSinkITCase {
.sinkTo(sink);
env.execute();
- assertThatIdsAreWritten(collectionOf(collection), 1, 2, 3, 4, 5);
+ assertThatIdsAreWrittenInAnyOrder(collectionOf(collection), 1, 2, 3,
4, 5);
assertThat(failed.get()).isTrue();
}
private static MongoSink<Document> createSink(
- String collection, DeliveryGuarantee deliveryGuarantee) {
+ String collection,
+ DeliveryGuarantee deliveryGuarantee,
+ boolean ordered,
+ boolean bypassDocumentValidation) {
return MongoSink.<Document>builder()
.setUri(MONGO_CONTAINER.getConnectionString())
.setDatabase(TEST_DATABASE)
.setCollection(collection)
.setBatchSize(5)
.setDeliveryGuarantee(deliveryGuarantee)
+ .setOrderedWrites(ordered)
+ .setBypassDocumentValidation(bypassDocumentValidation)
.setSerializationSchema(new UpsertSerializationSchema())
.build();
}
diff --git
a/flink-connector-mongodb/src/test/java/org/apache/flink/connector/mongodb/sink/writer/MongoWriterITCase.java
b/flink-connector-mongodb/src/test/java/org/apache/flink/connector/mongodb/sink/writer/MongoWriterITCase.java
index 2c5ba84..13cff69 100644
---
a/flink-connector-mongodb/src/test/java/org/apache/flink/connector/mongodb/sink/writer/MongoWriterITCase.java
+++
b/flink-connector-mongodb/src/test/java/org/apache/flink/connector/mongodb/sink/writer/MongoWriterITCase.java
@@ -55,7 +55,7 @@ import java.io.IOException;
import java.util.Optional;
import static
org.apache.flink.connector.mongodb.testutils.MongoTestUtil.assertThatIdsAreNotWritten;
-import static
org.apache.flink.connector.mongodb.testutils.MongoTestUtil.assertThatIdsAreWritten;
+import static
org.apache.flink.connector.mongodb.testutils.MongoTestUtil.assertThatIdsAreWrittenInAnyOrder;
import static
org.apache.flink.connector.mongodb.testutils.MongoTestUtil.assertThatIdsAreWrittenWithMaxWaitTime;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.fail;
@@ -120,14 +120,14 @@ class MongoWriterITCase {
// Trigger flush
writer.write(buildMessage(5), null);
- assertThatIdsAreWritten(collectionOf(collection), 1, 2, 3, 4, 5);
+ assertThatIdsAreWrittenInAnyOrder(collectionOf(collection), 1, 2,
3, 4, 5);
writer.write(buildMessage(6), null);
assertThatIdsAreNotWritten(collectionOf(collection), 6);
// Force flush
writer.doBulkWrite();
- assertThatIdsAreWritten(collectionOf(collection), 1, 2, 3, 4, 5,
6);
+ assertThatIdsAreWrittenInAnyOrder(collectionOf(collection), 1, 2,
3, 4, 5, 6);
}
}
@@ -169,7 +169,7 @@ class MongoWriterITCase {
// Trigger flush
writer.flush(false);
- assertThatIdsAreWritten(collectionOf(collection), 1, 2, 3);
+ assertThatIdsAreWrittenInAnyOrder(collectionOf(collection), 1, 2,
3);
}
}
diff --git
a/flink-connector-mongodb/src/test/java/org/apache/flink/connector/mongodb/testutils/MongoTestUtil.java
b/flink-connector-mongodb/src/test/java/org/apache/flink/connector/mongodb/testutils/MongoTestUtil.java
index d6c5bf4..ea0ed54 100644
---
a/flink-connector-mongodb/src/test/java/org/apache/flink/connector/mongodb/testutils/MongoTestUtil.java
+++
b/flink-connector-mongodb/src/test/java/org/apache/flink/connector/mongodb/testutils/MongoTestUtil.java
@@ -87,13 +87,37 @@ public class MongoTestUtil {
assertThat(idsAreWritten).isEmpty();
}
- public static void assertThatIdsAreWritten(MongoCollection<Document> coll,
Integer... ids) {
+ public static void assertThatIdsAreNotWritten(MongoCollection<Document>
coll, String... ids) {
+ List<String> idsAreWritten = new ArrayList<>();
+ coll.find(Filters.in("_id", ids)).map(d ->
d.getString("_id")).into(idsAreWritten);
+
+ assertThat(idsAreWritten).isEmpty();
+ }
+
+ public static void assertThatIdsAreWrittenInAnyOrder(
+ MongoCollection<Document> coll, Integer... ids) {
List<Integer> actualIds = new ArrayList<>();
coll.find(Filters.in("_id", ids)).map(d ->
d.getInteger("_id")).into(actualIds);
assertThat(actualIds).containsExactlyInAnyOrder(ids);
}
+ public static void assertThatIdsAreWrittenInAnyOrder(
+ MongoCollection<Document> coll, String... ids) {
+ List<String> actualIds = new ArrayList<>();
+ coll.find(Filters.in("_id", ids)).map(d ->
d.getString("_id")).into(actualIds);
+
+ assertThat(actualIds).containsExactlyInAnyOrder(ids);
+ }
+
+ public static void assertThatIdsAreWrittenInOrder(
+ MongoCollection<Document> coll, Integer... ids) {
+ List<Integer> actualIds = new ArrayList<>();
+ coll.find(Filters.in("_id", ids)).map(d ->
d.getInteger("_id")).into(actualIds);
+
+ assertThat(actualIds).containsExactly(ids);
+ }
+
public static void assertThatIdsAreWrittenWithMaxWaitTime(
MongoCollection<Document> coll, long maxWaitTimeMs, Integer... ids)
throws InterruptedException {
@@ -104,7 +128,7 @@ public class MongoTestUtil {
}
Thread.sleep(1000L);
}
- assertThatIdsAreWritten(coll, ids);
+ assertThatIdsAreWrittenInAnyOrder(coll, ids);
}
public static String getConnectorSql(