zentol commented on code in PR #1:
URL: 
https://github.com/apache/flink-connector-mongodb/pull/1#discussion_r1010520700


##########
pom.xml:
##########
@@ -0,0 +1,457 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+  http://www.apache.org/licenses/LICENSE-2.0
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+<project
+       xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd";
+       xmlns="http://maven.apache.org/POM/4.0.0"; 
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";>
+
+       <modelVersion>4.0.0</modelVersion>
+
+       <parent>
+               <groupId>io.github.zentol.flink</groupId>
+               <artifactId>flink-connector-parent</artifactId>
+               <version>1.0</version>
+       </parent>
+
+       <groupId>org.apache.flink</groupId>
+       <artifactId>flink-connector-mongodb-parent</artifactId>
+       <version>1.0.0-SNAPSHOT</version>
+
+       <name>Flink : Connectors : MongoDB Parent</name>
+       <packaging>pom</packaging>
+       <url>https://flink.apache.org</url>
+       <inceptionYear>2022</inceptionYear>
+
+       <licenses>
+               <license>
+                       <name>The Apache Software License, Version 2.0</name>
+                       
<url>https://www.apache.org/licenses/LICENSE-2.0.txt</url>
+                       <distribution>repo</distribution>
+               </license>
+       </licenses>
+
+       <scm>
+               <url>https://github.com/apache/flink-connector-mongodb</url>
+               
<connection>[email protected]:apache/flink-connector-mongodb.git</connection>
+               
<developerConnection>scm:git:https://gitbox.apache.org/repos/asf/flink-connector-mongodb.git</developerConnection>
+       </scm>
+
+       <properties>
+               <mongodb.version>4.7.1</mongodb.version>
+
+               <flink.version>1.16-SNAPSHOT</flink.version>
+               <flink.shaded.version>15.0</flink.shaded.version>
+
+               <junit4.version>4.13.2</junit4.version>
+               <junit5.version>5.8.1</junit5.version>
+               <hamcrest.version>1.3</hamcrest.version>

Review Comment:
   This shouldn't be required because all assertions should go through assertj.



##########
pom.xml:
##########
@@ -0,0 +1,457 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+  http://www.apache.org/licenses/LICENSE-2.0
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+<project
+       xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd";
+       xmlns="http://maven.apache.org/POM/4.0.0"; 
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";>
+
+       <modelVersion>4.0.0</modelVersion>
+
+       <parent>
+               <groupId>io.github.zentol.flink</groupId>
+               <artifactId>flink-connector-parent</artifactId>
+               <version>1.0</version>
+       </parent>
+
+       <groupId>org.apache.flink</groupId>
+       <artifactId>flink-connector-mongodb-parent</artifactId>
+       <version>1.0.0-SNAPSHOT</version>

Review Comment:
   ```suggestion
        <version>1.0-SNAPSHOT</version>
   ```



##########
flink-connector-mongodb/pom.xml:
##########
@@ -0,0 +1,165 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+  http://www.apache.org/licenses/LICENSE-2.0
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+<project
+       xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd";
+       xmlns="http://maven.apache.org/POM/4.0.0"; 
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";>
+
+       <modelVersion>4.0.0</modelVersion>
+
+       <parent>
+               <artifactId>flink-connector-mongodb-parent</artifactId>
+               <groupId>org.apache.flink</groupId>
+               <version>1.0.0-SNAPSHOT</version>
+               <relativePath>../pom.xml</relativePath>

Review Comment:
   ```suggestion
                <version>1.0-SNAPSHOT</version>
   ```



##########
flink-connector-mongodb/src/main/java/org/apache/flink/connector/mongodb/common/utils/MongoSerdeUtils.java:
##########
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.mongodb.common.utils;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.util.function.BiConsumerWithException;
+import org.apache.flink.util.function.FunctionWithException;
+
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+/** A util class with some helper method for serde in the MongoDB source. */
+@Internal
+public class MongoSerdeUtils {

Review Comment:
   This class is lacking tests.



##########
flink-connector-mongodb/src/main/java/org/apache/flink/connector/mongodb/sink/MongoSink.java:
##########
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.mongodb.sink;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.api.connector.sink2.Sink;
+import org.apache.flink.api.connector.sink2.SinkWriter;
+import org.apache.flink.connector.base.DeliveryGuarantee;
+import org.apache.flink.connector.mongodb.common.config.MongoConnectionOptions;
+import org.apache.flink.connector.mongodb.sink.config.MongoWriteOptions;
+import org.apache.flink.connector.mongodb.sink.writer.MongoWriter;
+import 
org.apache.flink.connector.mongodb.sink.writer.serializer.MongoSerializationSchema;
+
+import com.mongodb.client.model.WriteModel;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * MongoDB sink that requests multiple {@link WriteModel bulkRequests} against 
a cluster for each
+ * incoming element. The following example shows how to create a MongoSink 
receiving records of

Review Comment:
   Sounds strange that we do multiple requests for each element; please expand 
on that.



##########
flink-connector-mongodb/src/main/java/org/apache/flink/connector/mongodb/sink/MongoSink.java:
##########
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.mongodb.sink;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.api.connector.sink2.Sink;
+import org.apache.flink.api.connector.sink2.SinkWriter;
+import org.apache.flink.connector.base.DeliveryGuarantee;
+import org.apache.flink.connector.mongodb.common.config.MongoConnectionOptions;
+import org.apache.flink.connector.mongodb.sink.config.MongoWriteOptions;
+import org.apache.flink.connector.mongodb.sink.writer.MongoWriter;
+import 
org.apache.flink.connector.mongodb.sink.writer.serializer.MongoSerializationSchema;
+
+import com.mongodb.client.model.WriteModel;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * MongoDB sink that requests multiple {@link WriteModel bulkRequests} against 
a cluster for each
+ * incoming element. The following example shows how to create a MongoSink 
receiving records of
+ * {@code Document} type.
+ *
+ * <pre>{@code
+ * MongoSink<Document> sink = MongoSink.<Document>builder()
+ *     .setUri("mongodb://user:[email protected]:27017")
+ *     .setDatabase("db")
+ *     .setCollection("coll")
+ *     .setBulkFlushMaxActions(5)
+ *     .setSerializationSchema(
+ *         (doc, context) -> new InsertOneModel<>(doc.toBsonDocument()))
+ *     .build();
+ * }</pre>
+ *
+ * @param <IN> Type of the elements handled by this sink
+ */
+@PublicEvolving
+public class MongoSink<IN> implements Sink<IN> {
+
+    private static final long serialVersionUID = 1L;
+
+    private final MongoConnectionOptions connectionOptions;
+    private final MongoWriteOptions writeOptions;
+    private final MongoSerializationSchema<IN> serializationSchema;
+
+    MongoSink(
+            MongoConnectionOptions connectionOptions,
+            MongoWriteOptions writeOptions,
+            MongoSerializationSchema<IN> serializationSchema) {
+        this.connectionOptions = checkNotNull(connectionOptions);
+        this.writeOptions = checkNotNull(writeOptions);
+        this.serializationSchema = checkNotNull(serializationSchema);

Review Comment:
   You should use the ClosureCleaner here to ensure the schema is actually 
serializable.



##########
flink-connector-mongodb/src/main/java/org/apache/flink/connector/mongodb/sink/config/MongoWriteOptions.java:
##########
@@ -0,0 +1,205 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.mongodb.sink.config;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.connector.base.DeliveryGuarantee;
+
+import javax.annotation.Nullable;
+
+import java.io.Serializable;
+import java.util.Objects;
+
+import static 
org.apache.flink.connector.mongodb.table.config.MongoConnectorOptions.BULK_FLUSH_INTERVAL;
+import static 
org.apache.flink.connector.mongodb.table.config.MongoConnectorOptions.BULK_FLUSH_MAX_ACTIONS;
+import static 
org.apache.flink.connector.mongodb.table.config.MongoConnectorOptions.SINK_MAX_RETRIES;
+import static org.apache.flink.util.Preconditions.checkArgument;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+import static org.apache.flink.util.Preconditions.checkState;
+
+/** The configured class for Mongo sink. */

Review Comment:
   This sentence doesn't make any sense.



##########
flink-connector-mongodb/src/main/java/org/apache/flink/connector/mongodb/sink/writer/context/MongoSinkContext.java:
##########
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.mongodb.sink.writer.context;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.connector.mongodb.sink.config.MongoWriteOptions;
+import 
org.apache.flink.connector.mongodb.sink.writer.serializer.MongoSerializationSchema;
+
+/** This context provides information for {@link MongoSerializationSchema}. */
+@PublicEvolving
+public interface MongoSinkContext {

Review Comment:
   Consider extending `Sink.InitContext`



##########
flink-connector-mongodb/src/main/java/org/apache/flink/connector/mongodb/sink/config/MongoWriteOptions.java:
##########
@@ -0,0 +1,205 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.mongodb.sink.config;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.connector.base.DeliveryGuarantee;
+
+import javax.annotation.Nullable;
+
+import java.io.Serializable;
+import java.util.Objects;
+
+import static 
org.apache.flink.connector.mongodb.table.config.MongoConnectorOptions.BULK_FLUSH_INTERVAL;
+import static 
org.apache.flink.connector.mongodb.table.config.MongoConnectorOptions.BULK_FLUSH_MAX_ACTIONS;
+import static 
org.apache.flink.connector.mongodb.table.config.MongoConnectorOptions.SINK_MAX_RETRIES;
+import static org.apache.flink.util.Preconditions.checkArgument;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+import static org.apache.flink.util.Preconditions.checkState;
+
+/** The configured class for Mongo sink. */
+@PublicEvolving
+public class MongoWriteOptions implements Serializable {
+
+    private static final long serialVersionUID = 1L;
+
+    private final int bulkFlushMaxActions;
+    private final long bulkFlushIntervalMs;
+    private final int maxRetryTimes;
+    private final DeliveryGuarantee deliveryGuarantee;
+    private final @Nullable Integer parallelism;
+
+    private MongoWriteOptions(
+            int bulkFlushMaxActions,
+            long bulkFlushIntervalMs,
+            int maxRetryTimes,
+            DeliveryGuarantee deliveryGuarantee,
+            @Nullable Integer parallelism) {
+        this.bulkFlushMaxActions = bulkFlushMaxActions;
+        this.bulkFlushIntervalMs = bulkFlushIntervalMs;
+        this.maxRetryTimes = maxRetryTimes;
+        this.deliveryGuarantee = deliveryGuarantee;
+        this.parallelism = parallelism;
+    }
+
+    public int getBulkFlushMaxActions() {
+        return bulkFlushMaxActions;
+    }
+
+    public long getBulkFlushIntervalMs() {
+        return bulkFlushIntervalMs;
+    }
+
+    public int getMaxRetryTimes() {
+        return maxRetryTimes;
+    }
+
+    public DeliveryGuarantee getDeliveryGuarantee() {
+        return deliveryGuarantee;
+    }
+
+    @Nullable
+    public Integer getParallelism() {
+        return parallelism;
+    }
+
+    @Override
+    public boolean equals(Object o) {
+        if (this == o) {
+            return true;
+        }
+        if (o == null || getClass() != o.getClass()) {
+            return false;
+        }
+        MongoWriteOptions that = (MongoWriteOptions) o;
+        return bulkFlushMaxActions == that.bulkFlushMaxActions
+                && bulkFlushIntervalMs == that.bulkFlushIntervalMs
+                && maxRetryTimes == that.maxRetryTimes
+                && deliveryGuarantee == that.deliveryGuarantee
+                && Objects.equals(parallelism, that.parallelism);
+    }
+
+    @Override
+    public int hashCode() {
+        return Objects.hash(
+                bulkFlushMaxActions,
+                bulkFlushIntervalMs,
+                maxRetryTimes,
+                deliveryGuarantee,
+                parallelism);
+    }
+
+    public static MongoWriteOptionsBuilder builder() {
+        return new MongoWriteOptionsBuilder();
+    }
+
+    /** Builder for {@link MongoWriteOptions}. */
+    public static class MongoWriteOptionsBuilder {

Review Comment:
   add a private constructor to enforce a single instantiation path.



##########
flink-connector-mongodb/src/main/java/org/apache/flink/connector/mongodb/source/MongoSource.java:
##########
@@ -0,0 +1,186 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.mongodb.source;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.connector.source.Boundedness;
+import org.apache.flink.api.connector.source.Source;
+import org.apache.flink.api.connector.source.SourceReader;
+import org.apache.flink.api.connector.source.SourceReaderContext;
+import org.apache.flink.api.connector.source.SplitEnumerator;
+import org.apache.flink.api.connector.source.SplitEnumeratorContext;
+import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
+import org.apache.flink.connector.base.source.reader.RecordsWithSplitIds;
+import org.apache.flink.connector.base.source.reader.splitreader.SplitReader;
+import 
org.apache.flink.connector.base.source.reader.synchronization.FutureCompletingBlockingQueue;
+import org.apache.flink.connector.mongodb.common.config.MongoConnectionOptions;
+import org.apache.flink.connector.mongodb.source.config.MongoReadOptions;
+import 
org.apache.flink.connector.mongodb.source.enumerator.MongoSourceEnumState;
+import 
org.apache.flink.connector.mongodb.source.enumerator.MongoSourceEnumStateSerializer;
+import 
org.apache.flink.connector.mongodb.source.enumerator.MongoSourceEnumerator;
+import 
org.apache.flink.connector.mongodb.source.enumerator.assigner.MongoScanSplitAssigner;
+import 
org.apache.flink.connector.mongodb.source.enumerator.assigner.MongoSplitAssigner;
+import org.apache.flink.connector.mongodb.source.reader.MongoSourceReader;
+import 
org.apache.flink.connector.mongodb.source.reader.deserializer.MongoDeserializationSchema;
+import 
org.apache.flink.connector.mongodb.source.reader.emitter.MongoRecordEmitter;
+import 
org.apache.flink.connector.mongodb.source.reader.split.MongoScanSourceSplitReader;
+import org.apache.flink.connector.mongodb.source.split.MongoSourceSplit;
+import 
org.apache.flink.connector.mongodb.source.split.MongoSourceSplitSerializer;
+import org.apache.flink.core.io.SimpleVersionedSerializer;
+
+import org.bson.BsonDocument;
+
+import javax.annotation.Nullable;
+
+import java.util.List;
+import java.util.function.Supplier;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * The Source implementation of MongoDB. Please use a {@link 
MongoSourceBuilder} to construct a

Review Comment:
   ```suggestion
    * The Source implementation of MongoDB. Use a {@link MongoSourceBuilder} to 
construct a
   ```



##########
pom.xml:
##########
@@ -0,0 +1,457 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+  http://www.apache.org/licenses/LICENSE-2.0
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+<project
+       xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd";
+       xmlns="http://maven.apache.org/POM/4.0.0"; 
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";>
+
+       <modelVersion>4.0.0</modelVersion>
+
+       <parent>
+               <groupId>io.github.zentol.flink</groupId>
+               <artifactId>flink-connector-parent</artifactId>
+               <version>1.0</version>
+       </parent>
+
+       <groupId>org.apache.flink</groupId>
+       <artifactId>flink-connector-mongodb-parent</artifactId>
+       <version>1.0.0-SNAPSHOT</version>
+
+       <name>Flink : Connectors : MongoDB Parent</name>
+       <packaging>pom</packaging>
+       <url>https://flink.apache.org</url>
+       <inceptionYear>2022</inceptionYear>
+
+       <licenses>
+               <license>
+                       <name>The Apache Software License, Version 2.0</name>
+                       
<url>https://www.apache.org/licenses/LICENSE-2.0.txt</url>
+                       <distribution>repo</distribution>
+               </license>
+       </licenses>
+
+       <scm>
+               <url>https://github.com/apache/flink-connector-mongodb</url>
+               
<connection>[email protected]:apache/flink-connector-mongodb.git</connection>
+               
<developerConnection>scm:git:https://gitbox.apache.org/repos/asf/flink-connector-mongodb.git</developerConnection>
+       </scm>
+
+       <properties>
+               <mongodb.version>4.7.1</mongodb.version>
+
+               <flink.version>1.16-SNAPSHOT</flink.version>

Review Comment:
   ```suggestion
                <flink.version>1.16.0</flink.version>
   ```



##########
pom.xml:
##########
@@ -0,0 +1,457 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+  http://www.apache.org/licenses/LICENSE-2.0
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+<project
+       xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd";
+       xmlns="http://maven.apache.org/POM/4.0.0"; 
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";>
+
+       <modelVersion>4.0.0</modelVersion>
+
+       <parent>
+               <groupId>io.github.zentol.flink</groupId>
+               <artifactId>flink-connector-parent</artifactId>
+               <version>1.0</version>
+       </parent>
+
+       <groupId>org.apache.flink</groupId>
+       <artifactId>flink-connector-mongodb-parent</artifactId>
+       <version>1.0.0-SNAPSHOT</version>
+
+       <name>Flink : Connectors : MongoDB Parent</name>
+       <packaging>pom</packaging>
+       <url>https://flink.apache.org</url>
+       <inceptionYear>2022</inceptionYear>
+
+       <licenses>
+               <license>
+                       <name>The Apache Software License, Version 2.0</name>
+                       
<url>https://www.apache.org/licenses/LICENSE-2.0.txt</url>
+                       <distribution>repo</distribution>
+               </license>
+       </licenses>
+
+       <scm>
+               <url>https://github.com/apache/flink-connector-mongodb</url>
+               
<connection>[email protected]:apache/flink-connector-mongodb.git</connection>
+               
<developerConnection>scm:git:https://gitbox.apache.org/repos/asf/flink-connector-mongodb.git</developerConnection>
+       </scm>
+
+       <properties>
+               <mongodb.version>4.7.1</mongodb.version>
+
+               <flink.version>1.16-SNAPSHOT</flink.version>
+               <flink.shaded.version>15.0</flink.shaded.version>
+
+               <junit4.version>4.13.2</junit4.version>
+               <junit5.version>5.8.1</junit5.version>
+               <hamcrest.version>1.3</hamcrest.version>
+               <assertj.version>3.21.0</assertj.version>
+               <archunit.version>0.22.0</archunit.version>
+               <testcontainers.version>1.17.2</testcontainers.version>
+               <mockito.version>3.4.6</mockito.version>
+
+               <japicmp.skip>false</japicmp.skip>
+               <japicmp.referenceVersion>1.15.0</japicmp.referenceVersion>
+
+               <slf4j.version>1.7.36</slf4j.version>
+               <log4j.version>2.17.2</log4j.version>
+
+               
<flink.parent.artifactId>flink-connector-mongodb-parent</flink.parent.artifactId>
+       </properties>
+
+       <modules>
+               <module>flink-connector-mongodb</module>
+               <module>flink-sql-connector-mongodb</module>
+               <module>flink-connector-mongodb-e2e-tests</module>
+       </modules>
+
+       <dependencies>
+               <dependency>
+                       <groupId>org.apache.flink</groupId>
+                       <artifactId>flink-shaded-force-shading</artifactId>
+                       <version>${flink.shaded.version}</version>
+               </dependency>
+
+               <!-- Root dependencies for all projects -->
+
+               <!-- Logging API -->
+               <dependency>
+                       <groupId>org.slf4j</groupId>
+                       <artifactId>slf4j-api</artifactId>
+                       <scope>provided</scope>
+               </dependency>
+
+               <!-- 'javax.annotation' classes like '@Nullable' -->
+               <dependency>
+                       <groupId>com.google.code.findbugs</groupId>
+                       <artifactId>jsr305</artifactId>
+                       <scope>provided</scope>
+               </dependency>
+
+               <!-- Test dependencies -->
+               <dependency>
+                       <groupId>org.junit.jupiter</groupId>
+                       <artifactId>junit-jupiter</artifactId>
+                       <scope>test</scope>
+               </dependency>
+
+               <dependency>
+                       <groupId>org.junit.vintage</groupId>
+                       <artifactId>junit-vintage-engine</artifactId>
+                       <scope>test</scope>
+               </dependency>
+
+               <dependency>
+                       <groupId>org.assertj</groupId>
+                       <artifactId>assertj-core</artifactId>
+                       <scope>test</scope>
+               </dependency>
+
+               <dependency>
+                       <groupId>org.hamcrest</groupId>
+                       <artifactId>hamcrest-all</artifactId>
+                       <scope>test</scope>
+               </dependency>
+
+               <dependency>
+                       <groupId>org.mockito</groupId>
+                       <artifactId>mockito-inline</artifactId>
+                       <type>jar</type>
+                       <scope>test</scope>
+               </dependency>
+
+               <dependency>
+                       <groupId>org.mockito</groupId>
+                       <artifactId>mockito-core</artifactId>
+                       <type>jar</type>
+                       <scope>test</scope>
+               </dependency>
+
+               <dependency>
+                       <groupId>org.testcontainers</groupId>
+                       <artifactId>junit-jupiter</artifactId>
+                       <scope>test</scope>
+               </dependency>
+
+               <!-- Tests will have log4j as the default logging framework 
available -->
+
+               <dependency>
+                       <groupId>org.apache.logging.log4j</groupId>
+                       <artifactId>log4j-slf4j-impl</artifactId>
+                       <scope>test</scope>
+               </dependency>
+
+               <dependency>
+                       <groupId>org.apache.logging.log4j</groupId>
+                       <artifactId>log4j-api</artifactId>
+                       <scope>test</scope>
+               </dependency>
+
+               <dependency>
+                       <groupId>org.apache.logging.log4j</groupId>
+                       <artifactId>log4j-core</artifactId>
+                       <scope>test</scope>
+               </dependency>
+
+               <dependency>
+                       <!-- API bridge between log4j 1 and 2 -->
+                       <groupId>org.apache.logging.log4j</groupId>
+                       <artifactId>log4j-1.2-api</artifactId>
+                       <scope>test</scope>
+               </dependency>
+
+               <dependency>
+                       <groupId>org.apache.flink</groupId>
+                       <artifactId>flink-test-utils-junit</artifactId>
+                       <scope>test</scope>
+               </dependency>
+
+               <!-- ArchUit test dependencies -->
+               <dependency>
+                       <groupId>org.apache.flink</groupId>
+                       <artifactId>flink-architecture-tests-test</artifactId>
+                       <scope>test</scope>
+               </dependency>
+               <dependency>
+                       <groupId>org.apache.flink</groupId>
+                       
<artifactId>flink-architecture-tests-production</artifactId>
+                       <scope>test</scope>
+               </dependency>
+       </dependencies>
+
+       <dependencyManagement>
+               <dependencies>
+                       <dependency>
+                               <groupId>org.mongodb</groupId>
+                               <artifactId>mongodb-driver-sync</artifactId>
+                               <version>${mongodb.version}</version>
+                               <exclusions>
+                                       <exclusion>
+                                               <groupId>org.slf4j</groupId>
+                                               
<artifactId>slf4j-api</artifactId>
+                                       </exclusion>
+                               </exclusions>

Review Comment:
   ```suggestion
   ```
   There isn't really a reason to exclude this.



##########
pom.xml:
##########
@@ -0,0 +1,457 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+  http://www.apache.org/licenses/LICENSE-2.0
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+<project
+       xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd";
+       xmlns="http://maven.apache.org/POM/4.0.0"; 
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";>
+
+       <modelVersion>4.0.0</modelVersion>
+
+       <parent>
+               <groupId>io.github.zentol.flink</groupId>
+               <artifactId>flink-connector-parent</artifactId>
+               <version>1.0</version>
+       </parent>
+
+       <groupId>org.apache.flink</groupId>
+       <artifactId>flink-connector-mongodb-parent</artifactId>
+       <version>1.0.0-SNAPSHOT</version>
+
+       <name>Flink : Connectors : MongoDB Parent</name>
+       <packaging>pom</packaging>
+       <url>https://flink.apache.org</url>
+       <inceptionYear>2022</inceptionYear>
+
+       <licenses>
+               <license>
+                       <name>The Apache Software License, Version 2.0</name>
+                       
<url>https://www.apache.org/licenses/LICENSE-2.0.txt</url>
+                       <distribution>repo</distribution>
+               </license>
+       </licenses>
+
+       <scm>
+               <url>https://github.com/apache/flink-connector-mongodb</url>
+               
<connection>[email protected]:apache/flink-connector-mongodb.git</connection>
+               
<developerConnection>scm:git:https://gitbox.apache.org/repos/asf/flink-connector-mongodb.git</developerConnection>
+       </scm>
+
+       <properties>
+               <mongodb.version>4.7.1</mongodb.version>
+
+               <flink.version>1.16-SNAPSHOT</flink.version>
+               <flink.shaded.version>15.0</flink.shaded.version>
+
+               <junit4.version>4.13.2</junit4.version>
+               <junit5.version>5.8.1</junit5.version>
+               <hamcrest.version>1.3</hamcrest.version>
+               <assertj.version>3.21.0</assertj.version>
+               <archunit.version>0.22.0</archunit.version>
+               <testcontainers.version>1.17.2</testcontainers.version>
+               <mockito.version>3.4.6</mockito.version>
+
+               <japicmp.skip>false</japicmp.skip>
+               <japicmp.referenceVersion>1.15.0</japicmp.referenceVersion>
+
+               <slf4j.version>1.7.36</slf4j.version>
+               <log4j.version>2.17.2</log4j.version>
+
+               
<flink.parent.artifactId>flink-connector-mongodb-parent</flink.parent.artifactId>
+       </properties>
+
+       <modules>
+               <module>flink-connector-mongodb</module>
+               <module>flink-sql-connector-mongodb</module>
+               <module>flink-connector-mongodb-e2e-tests</module>
+       </modules>
+
+       <dependencies>
+               <dependency>
+                       <groupId>org.apache.flink</groupId>
+                       <artifactId>flink-shaded-force-shading</artifactId>
+                       <version>${flink.shaded.version}</version>
+               </dependency>
+
+               <!-- Root dependencies for all projects -->
+
+               <!-- Logging API -->
+               <dependency>
+                       <groupId>org.slf4j</groupId>
+                       <artifactId>slf4j-api</artifactId>
+                       <scope>provided</scope>
+               </dependency>
+
+               <!-- 'javax.annotation' classes like '@Nullable' -->
+               <dependency>
+                       <groupId>com.google.code.findbugs</groupId>
+                       <artifactId>jsr305</artifactId>
+                       <scope>provided</scope>
+               </dependency>
+
+               <!-- Test dependencies -->
+               <dependency>
+                       <groupId>org.junit.jupiter</groupId>
+                       <artifactId>junit-jupiter</artifactId>
+                       <scope>test</scope>
+               </dependency>
+
+               <dependency>
+                       <groupId>org.junit.vintage</groupId>
+                       <artifactId>junit-vintage-engine</artifactId>
+                       <scope>test</scope>
+               </dependency>
+
+               <dependency>
+                       <groupId>org.assertj</groupId>
+                       <artifactId>assertj-core</artifactId>
+                       <scope>test</scope>
+               </dependency>
+
+               <dependency>
+                       <groupId>org.hamcrest</groupId>
+                       <artifactId>hamcrest-all</artifactId>
+                       <scope>test</scope>
+               </dependency>
+
+               <dependency>
+                       <groupId>org.mockito</groupId>
+                       <artifactId>mockito-inline</artifactId>
+                       <type>jar</type>
+                       <scope>test</scope>
+               </dependency>
+
+               <dependency>
+                       <groupId>org.mockito</groupId>
+                       <artifactId>mockito-core</artifactId>
+                       <type>jar</type>
+                       <scope>test</scope>
+               </dependency>
+
+               <dependency>
+                       <groupId>org.testcontainers</groupId>
+                       <artifactId>junit-jupiter</artifactId>
+                       <scope>test</scope>
+               </dependency>
+
+               <!-- Tests will have log4j as the default logging framework 
available -->
+
+               <dependency>
+                       <groupId>org.apache.logging.log4j</groupId>
+                       <artifactId>log4j-slf4j-impl</artifactId>
+                       <scope>test</scope>
+               </dependency>
+
+               <dependency>
+                       <groupId>org.apache.logging.log4j</groupId>
+                       <artifactId>log4j-api</artifactId>
+                       <scope>test</scope>
+               </dependency>
+
+               <dependency>
+                       <groupId>org.apache.logging.log4j</groupId>
+                       <artifactId>log4j-core</artifactId>
+                       <scope>test</scope>
+               </dependency>
+
+               <dependency>
+                       <!-- API bridge between log4j 1 and 2 -->
+                       <groupId>org.apache.logging.log4j</groupId>
+                       <artifactId>log4j-1.2-api</artifactId>
+                       <scope>test</scope>
+               </dependency>
+
+               <dependency>
+                       <groupId>org.apache.flink</groupId>
+                       <artifactId>flink-test-utils-junit</artifactId>
+                       <scope>test</scope>
+               </dependency>
+
+               <!-- ArchUit test dependencies -->
+               <dependency>
+                       <groupId>org.apache.flink</groupId>
+                       <artifactId>flink-architecture-tests-test</artifactId>
+                       <scope>test</scope>
+               </dependency>
+               <dependency>
+                       <groupId>org.apache.flink</groupId>
+                       
<artifactId>flink-architecture-tests-production</artifactId>
+                       <scope>test</scope>
+               </dependency>
+       </dependencies>
+
+       <dependencyManagement>
+               <dependencies>
+                       <dependency>
+                               <groupId>org.mongodb</groupId>
+                               <artifactId>mongodb-driver-sync</artifactId>
+                               <version>${mongodb.version}</version>
+                               <exclusions>
+                                       <exclusion>
+                                               <groupId>org.slf4j</groupId>
+                                               
<artifactId>slf4j-api</artifactId>
+                                       </exclusion>
+                               </exclusions>
+                       </dependency>
+
+                       <!-- Flink dependencies -->
+
+                       <dependency>
+                               <groupId>org.apache.flink</groupId>
+                               <artifactId>flink-test-utils</artifactId>
+                               <version>${flink.version}</version>
+                               <scope>test</scope>
+                               <exclusions>
+                                       <exclusion>
+                                               <groupId>log4j</groupId>
+                                               <artifactId>log4j</artifactId>
+                                       </exclusion>
+                                       <exclusion>
+                                               <groupId>org.slf4j</groupId>
+                                               
<artifactId>slf4j-log4j12</artifactId>
+                                       </exclusion>
+                               </exclusions>

Review Comment:
   ```suggestion
   ```
   With 1.16.0 these _should_ no longer leak through.



##########
flink-connector-mongodb/src/test/java/org/apache/flink/architecture/TestCodeArchitectureTest.java:
##########
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.architecture;
+
+import org.apache.flink.architecture.common.ImportOptions;
+
+import com.tngtech.archunit.core.importer.ImportOption;
+import com.tngtech.archunit.junit.AnalyzeClasses;
+import com.tngtech.archunit.junit.ArchTest;
+import com.tngtech.archunit.junit.ArchTests;
+
+/** Architecture tests for test code. */
+@AnalyzeClasses(
+        packages = "org.apache.flink.connector.mongodb",
+        importOptions = {
+            ImportOption.OnlyIncludeTests.class,
+            ImportOptions.ExcludeScalaImportOption.class,
+            ImportOptions.ExcludeShadedImportOption.class
+        })
+public class TestCodeArchitectureTest {
+
+    @ArchTest
+    public static final ArchTests COMMON_TESTS = 
ArchTests.in(TestCodeArchitectureTestBase.class);
+}

Review Comment:
   There should also be a `ProductionCodeArchitectureTest`



##########
flink-connector-mongodb/src/main/java/org/apache/flink/connector/mongodb/source/enumerator/MongoSourceEnumerator.java:
##########
@@ -0,0 +1,127 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.mongodb.source.enumerator;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.connector.source.Boundedness;
+import org.apache.flink.api.connector.source.SplitEnumerator;
+import org.apache.flink.api.connector.source.SplitEnumeratorContext;
+import org.apache.flink.connector.mongodb.source.MongoSource;
+import 
org.apache.flink.connector.mongodb.source.enumerator.assigner.MongoSplitAssigner;
+import org.apache.flink.connector.mongodb.source.split.MongoSourceSplit;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+
+import java.io.IOException;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Optional;
+import java.util.TreeSet;
+
+/** The enumerator class for {@link MongoSource}. */
+@Internal
+public class MongoSourceEnumerator
+        implements SplitEnumerator<MongoSourceSplit, MongoSourceEnumState> {
+
+    private static final Logger LOG = 
LoggerFactory.getLogger(MongoSourceEnumerator.class);
+
+    private final Boundedness boundedness;
+    private final SplitEnumeratorContext<MongoSourceSplit> context;
+    private final MongoSplitAssigner splitAssigner;
+    private final TreeSet<Integer> readersAwaitingSplit;
+
+    public MongoSourceEnumerator(
+            Boundedness boundedness,
+            SplitEnumeratorContext<MongoSourceSplit> context,
+            MongoSplitAssigner splitAssigner) {
+        this.boundedness = boundedness;
+        this.context = context;
+        this.splitAssigner = splitAssigner;
+        this.readersAwaitingSplit = new TreeSet<>();
+    }
+
+    @Override
+    public void start() {
+        splitAssigner.open();
+    }
+
+    @Override
+    public void handleSplitRequest(int subtaskId, @Nullable String 
requesterHostname) {
+        if (!context.registeredReaders().containsKey(subtaskId)) {
+            // reader failed between sending the request and now. skip this 
request.
+            return;
+        }
+
+        readersAwaitingSplit.add(subtaskId);
+        assignSplits();
+    }
+
+    @Override
+    public void addSplitsBack(List<MongoSourceSplit> splits, int subtaskId) {
+        LOG.debug("Mongo Source Enumerator adds splits back: {}", splits);
+        splitAssigner.addSplitsBack(splits);
+    }
+
+    @Override
+    public void addReader(int subtaskId) {
+        LOG.debug("Adding reader {} to MongoSourceEnumerator.", subtaskId);

Review Comment:
   Why isnt this doing anything?



##########
flink-connector-mongodb/src/main/java/org/apache/flink/connector/mongodb/source/config/MongoReadOptions.java:
##########
@@ -0,0 +1,224 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.mongodb.source.config;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.configuration.MemorySize;
+import 
org.apache.flink.connector.mongodb.source.enumerator.splitter.PartitionStrategy;
+
+import java.io.Serializable;
+import java.util.Objects;
+
+import static 
org.apache.flink.connector.mongodb.table.config.MongoConnectorOptions.SCAN_CURSOR_BATCH_SIZE;
+import static 
org.apache.flink.connector.mongodb.table.config.MongoConnectorOptions.SCAN_CURSOR_NO_TIMEOUT;
+import static 
org.apache.flink.connector.mongodb.table.config.MongoConnectorOptions.SCAN_FETCH_SIZE;
+import static 
org.apache.flink.connector.mongodb.table.config.MongoConnectorOptions.SCAN_PARTITION_SAMPLES;
+import static 
org.apache.flink.connector.mongodb.table.config.MongoConnectorOptions.SCAN_PARTITION_SIZE;
+import static 
org.apache.flink.connector.mongodb.table.config.MongoConnectorOptions.SCAN_PARTITION_STRATEGY;
+import static org.apache.flink.util.Preconditions.checkArgument;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/** The configuration class for MongoDB source. */
+@PublicEvolving
+public class MongoReadOptions implements Serializable {
+
+    private static final long serialVersionUID = 1L;
+
+    private final int fetchSize;
+
+    private final int cursorBatchSize;
+
+    private final boolean noCursorTimeout;
+
+    private final PartitionStrategy partitionStrategy;
+
+    private final MemorySize partitionSize;
+
+    private final int samplesPerPartition;
+
+    private MongoReadOptions(
+            int fetchSize,
+            int cursorBatchSize,
+            boolean noCursorTimeout,
+            PartitionStrategy partitionStrategy,
+            MemorySize partitionSize,
+            int samplesPerPartition) {
+        this.fetchSize = fetchSize;
+        this.cursorBatchSize = cursorBatchSize;
+        this.noCursorTimeout = noCursorTimeout;
+        this.partitionStrategy = partitionStrategy;
+        this.partitionSize = partitionSize;
+        this.samplesPerPartition = samplesPerPartition;
+    }
+
+    public int getFetchSize() {
+        return fetchSize;
+    }
+
+    public int getCursorBatchSize() {
+        return cursorBatchSize;
+    }
+
+    public boolean isNoCursorTimeout() {
+        return noCursorTimeout;
+    }
+
+    public PartitionStrategy getPartitionStrategy() {
+        return partitionStrategy;
+    }
+
+    public MemorySize getPartitionSize() {
+        return partitionSize;
+    }
+
+    public int getSamplesPerPartition() {
+        return samplesPerPartition;
+    }
+
+    @Override
+    public boolean equals(Object o) {
+        if (this == o) {
+            return true;
+        }
+        if (o == null || getClass() != o.getClass()) {
+            return false;
+        }
+        MongoReadOptions that = (MongoReadOptions) o;
+        return cursorBatchSize == that.cursorBatchSize
+                && noCursorTimeout == that.noCursorTimeout
+                && partitionStrategy == that.partitionStrategy
+                && samplesPerPartition == that.samplesPerPartition
+                && Objects.equals(partitionSize, that.partitionSize);
+    }
+
+    @Override
+    public int hashCode() {
+        return Objects.hash(
+                cursorBatchSize,
+                noCursorTimeout,
+                partitionStrategy,
+                partitionSize,
+                samplesPerPartition);
+    }
+
+    public static MongoReadOptionsBuilder builder() {
+        return new MongoReadOptionsBuilder();
+    }
+
+    /** Builder for {@link MongoReadOptions}. */
+    public static class MongoReadOptionsBuilder {
+        private int fetchSize = SCAN_FETCH_SIZE.defaultValue();
+        private int cursorBatchSize = SCAN_CURSOR_BATCH_SIZE.defaultValue();
+        private boolean noCursorTimeout = 
SCAN_CURSOR_NO_TIMEOUT.defaultValue();
+        private PartitionStrategy partitionStrategy = 
SCAN_PARTITION_STRATEGY.defaultValue();
+        private MemorySize partitionSize = SCAN_PARTITION_SIZE.defaultValue();
+        private int samplesPerPartition = 
SCAN_PARTITION_SAMPLES.defaultValue();
+
+        /**
+         * Sets the number of documents should be fetched per round-trip when 
reading.
+         *
+         * @param fetchSize the number of documents should be fetched per 
round-trip when reading.
+         * @return this builder
+         */
+        public MongoReadOptionsBuilder setFetchSize(int fetchSize) {
+            checkArgument(fetchSize > 0, "The fetch size must be larger than 
0.");
+            this.fetchSize = fetchSize;
+            return this;
+        }
+
+        /**
+         * Sets the batch size of MongoDB find cursor.
+         *
+         * @param cursorBatchSize the max batch size of find cursor.
+         * @return this builder
+         */
+        public MongoReadOptionsBuilder setCursorBatchSize(int cursorBatchSize) 
{
+            checkArgument(
+                    cursorBatchSize >= 0,
+                    "The cursor batch size must be larger than or equal to 
0.");
+            this.cursorBatchSize = cursorBatchSize;
+            return this;
+        }
+
+        /**
+         * Set this option to true to prevent cursor timeout (10 minutes).
+         *
+         * @param noCursorTimeout Set this option to true to prevent cursor 
timeout (10 minutes)
+         * @return this builder
+         */
+        public MongoReadOptionsBuilder setNoCursorTimeout(boolean 
noCursorTimeout) {
+            this.noCursorTimeout = noCursorTimeout;
+            return this;
+        }
+
+        /**
+         * Sets the partition strategy.
+         *
+         * @param partitionStrategy the strategy of a partition.
+         * @return this builder
+         */
+        public MongoReadOptionsBuilder setPartitionStrategy(PartitionStrategy 
partitionStrategy) {
+            checkNotNull(partitionStrategy, "The partition strategy must not 
be null.");
+            this.partitionStrategy = partitionStrategy;
+            return this;
+        }
+
+        /**
+         * Sets the partition size of MongoDB split.
+         *
+         * @param partitionSize the memory size of a partition.
+         * @return this builder
+         */
+        public MongoReadOptionsBuilder setPartitionSize(MemorySize 
partitionSize) {
+            checkNotNull(partitionSize, "The partition size must not be null");
+            checkArgument(
+                    partitionSize.getMebiBytes() >= 1,
+                    "The partition size must be larger than or equals to 
1mb.");

Review Comment:
   why?



##########
flink-connector-mongodb/src/main/java/org/apache/flink/connector/mongodb/common/utils/MongoConstants.java:
##########
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.mongodb.common.utils;
+
+import org.apache.flink.annotation.PublicEvolving;
+
+import org.bson.BsonDocument;
+import org.bson.BsonInt32;
+import org.bson.BsonMaxKey;
+import org.bson.BsonMinKey;
+import org.bson.BsonValue;
+
+/** Constants for MongoDB. */
+@PublicEvolving

Review Comment:
   How are these constants used by users?



##########
flink-connector-mongodb/src/main/java/org/apache/flink/connector/mongodb/sink/config/MongoWriteOptions.java:
##########
@@ -0,0 +1,205 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.mongodb.sink.config;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.connector.base.DeliveryGuarantee;
+
+import javax.annotation.Nullable;
+
+import java.io.Serializable;
+import java.util.Objects;
+
+import static 
org.apache.flink.connector.mongodb.table.config.MongoConnectorOptions.BULK_FLUSH_INTERVAL;
+import static 
org.apache.flink.connector.mongodb.table.config.MongoConnectorOptions.BULK_FLUSH_MAX_ACTIONS;
+import static 
org.apache.flink.connector.mongodb.table.config.MongoConnectorOptions.SINK_MAX_RETRIES;
+import static org.apache.flink.util.Preconditions.checkArgument;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+import static org.apache.flink.util.Preconditions.checkState;
+
+/** The configured class for Mongo sink. */
+@PublicEvolving
+public class MongoWriteOptions implements Serializable {

Review Comment:
   ```suggestion
   public final class MongoWriteOptions implements Serializable {
   ```
   Apply to other classes where it makes sense.



##########
flink-connector-mongodb/src/main/java/org/apache/flink/connector/mongodb/source/config/MongoReadOptions.java:
##########
@@ -0,0 +1,224 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.mongodb.source.config;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.configuration.MemorySize;
+import 
org.apache.flink.connector.mongodb.source.enumerator.splitter.PartitionStrategy;
+
+import java.io.Serializable;
+import java.util.Objects;
+
+import static 
org.apache.flink.connector.mongodb.table.config.MongoConnectorOptions.SCAN_CURSOR_BATCH_SIZE;
+import static 
org.apache.flink.connector.mongodb.table.config.MongoConnectorOptions.SCAN_CURSOR_NO_TIMEOUT;
+import static 
org.apache.flink.connector.mongodb.table.config.MongoConnectorOptions.SCAN_FETCH_SIZE;
+import static 
org.apache.flink.connector.mongodb.table.config.MongoConnectorOptions.SCAN_PARTITION_SAMPLES;
+import static 
org.apache.flink.connector.mongodb.table.config.MongoConnectorOptions.SCAN_PARTITION_SIZE;
+import static 
org.apache.flink.connector.mongodb.table.config.MongoConnectorOptions.SCAN_PARTITION_STRATEGY;
+import static org.apache.flink.util.Preconditions.checkArgument;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/** The configuration class for MongoDB source. */
+@PublicEvolving
+public class MongoReadOptions implements Serializable {
+
+    private static final long serialVersionUID = 1L;
+
+    private final int fetchSize;
+
+    private final int cursorBatchSize;
+
+    private final boolean noCursorTimeout;
+
+    private final PartitionStrategy partitionStrategy;
+
+    private final MemorySize partitionSize;
+
+    private final int samplesPerPartition;
+
+    private MongoReadOptions(
+            int fetchSize,
+            int cursorBatchSize,
+            boolean noCursorTimeout,
+            PartitionStrategy partitionStrategy,
+            MemorySize partitionSize,
+            int samplesPerPartition) {
+        this.fetchSize = fetchSize;
+        this.cursorBatchSize = cursorBatchSize;
+        this.noCursorTimeout = noCursorTimeout;
+        this.partitionStrategy = partitionStrategy;
+        this.partitionSize = partitionSize;
+        this.samplesPerPartition = samplesPerPartition;
+    }
+
+    public int getFetchSize() {
+        return fetchSize;
+    }
+
+    public int getCursorBatchSize() {
+        return cursorBatchSize;
+    }
+
+    public boolean isNoCursorTimeout() {
+        return noCursorTimeout;
+    }
+
+    public PartitionStrategy getPartitionStrategy() {
+        return partitionStrategy;
+    }
+
+    public MemorySize getPartitionSize() {
+        return partitionSize;
+    }
+
+    public int getSamplesPerPartition() {
+        return samplesPerPartition;
+    }
+
+    @Override
+    public boolean equals(Object o) {
+        if (this == o) {
+            return true;
+        }
+        if (o == null || getClass() != o.getClass()) {
+            return false;
+        }
+        MongoReadOptions that = (MongoReadOptions) o;
+        return cursorBatchSize == that.cursorBatchSize
+                && noCursorTimeout == that.noCursorTimeout
+                && partitionStrategy == that.partitionStrategy
+                && samplesPerPartition == that.samplesPerPartition
+                && Objects.equals(partitionSize, that.partitionSize);
+    }
+
+    @Override
+    public int hashCode() {
+        return Objects.hash(
+                cursorBatchSize,
+                noCursorTimeout,
+                partitionStrategy,
+                partitionSize,
+                samplesPerPartition);
+    }
+
+    public static MongoReadOptionsBuilder builder() {
+        return new MongoReadOptionsBuilder();
+    }
+
+    /** Builder for {@link MongoReadOptions}. */
+    public static class MongoReadOptionsBuilder {
+        private int fetchSize = SCAN_FETCH_SIZE.defaultValue();
+        private int cursorBatchSize = SCAN_CURSOR_BATCH_SIZE.defaultValue();
+        private boolean noCursorTimeout = 
SCAN_CURSOR_NO_TIMEOUT.defaultValue();
+        private PartitionStrategy partitionStrategy = 
SCAN_PARTITION_STRATEGY.defaultValue();
+        private MemorySize partitionSize = SCAN_PARTITION_SIZE.defaultValue();
+        private int samplesPerPartition = 
SCAN_PARTITION_SAMPLES.defaultValue();
+
+        /**
+         * Sets the number of documents should be fetched per round-trip when 
reading.
+         *
+         * @param fetchSize the number of documents should be fetched per 
round-trip when reading.
+         * @return this builder
+         */
+        public MongoReadOptionsBuilder setFetchSize(int fetchSize) {
+            checkArgument(fetchSize > 0, "The fetch size must be larger than 
0.");
+            this.fetchSize = fetchSize;
+            return this;
+        }
+
+        /**
+         * Sets the batch size of MongoDB find cursor.
+         *
+         * @param cursorBatchSize the max batch size of find cursor.
+         * @return this builder
+         */
+        public MongoReadOptionsBuilder setCursorBatchSize(int cursorBatchSize) 
{
+            checkArgument(
+                    cursorBatchSize >= 0,
+                    "The cursor batch size must be larger than or equal to 
0.");
+            this.cursorBatchSize = cursorBatchSize;
+            return this;
+        }
+
+        /**
+         * Set this option to true to prevent cursor timeout (10 minutes).
+         *
+         * @param noCursorTimeout Set this option to true to prevent cursor 
timeout (10 minutes)
+         * @return this builder
+         */
+        public MongoReadOptionsBuilder setNoCursorTimeout(boolean 
noCursorTimeout) {
+            this.noCursorTimeout = noCursorTimeout;
+            return this;
+        }
+
+        /**
+         * Sets the partition strategy.
+         *
+         * @param partitionStrategy the strategy of a partition.
+         * @return this builder
+         */
+        public MongoReadOptionsBuilder setPartitionStrategy(PartitionStrategy 
partitionStrategy) {
+            checkNotNull(partitionStrategy, "The partition strategy must not 
be null.");
+            this.partitionStrategy = partitionStrategy;
+            return this;
+        }
+
+        /**
+         * Sets the partition size of MongoDB split.
+         *
+         * @param partitionSize the memory size of a partition.
+         * @return this builder
+         */
+        public MongoReadOptionsBuilder setPartitionSize(MemorySize 
partitionSize) {
+            checkNotNull(partitionSize, "The partition size must not be null");
+            checkArgument(
+                    partitionSize.getMebiBytes() >= 1,
+                    "The partition size must be larger than or equals to 
1mb.");
+            this.partitionSize = partitionSize;
+            return this;
+        }
+
+        /**
+         * Sets the partition size of MongoDB split.

Review Comment:
   javadocs are identical to setPartitionSize



##########
flink-connector-mongodb/src/main/java/org/apache/flink/connector/mongodb/sink/writer/MongoWriter.java:
##########
@@ -0,0 +1,183 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.mongodb.sink.writer;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.common.functions.util.ListCollector;
+import org.apache.flink.api.common.operators.MailboxExecutor;
+import org.apache.flink.api.common.serialization.SerializationSchema;
+import org.apache.flink.api.connector.sink2.Sink;
+import org.apache.flink.api.connector.sink2.SinkWriter;
+import org.apache.flink.connector.mongodb.common.config.MongoConnectionOptions;
+import org.apache.flink.connector.mongodb.sink.config.MongoWriteOptions;
+import org.apache.flink.connector.mongodb.sink.writer.context.MongoSinkContext;
+import 
org.apache.flink.connector.mongodb.sink.writer.context.MongoSinkContextImpl;
+import 
org.apache.flink.connector.mongodb.sink.writer.serializer.MongoSerializationSchema;
+import org.apache.flink.metrics.groups.SinkWriterMetricGroup;
+import org.apache.flink.runtime.operators.util.metrics.CountingCollector;
+import org.apache.flink.util.Collector;
+import org.apache.flink.util.FlinkRuntimeException;
+
+import com.mongodb.MongoException;
+import com.mongodb.client.MongoClient;
+import com.mongodb.client.MongoClients;
+import com.mongodb.client.model.WriteModel;
+import org.bson.BsonDocument;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * This class is responsible to write records in a MongoDB collection.

Review Comment:
   ```suggestion
    * This class is responsible for writing records to a MongoDB collection.
   ```



##########
pom.xml:
##########
@@ -0,0 +1,457 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+  http://www.apache.org/licenses/LICENSE-2.0
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+<project
+       xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd";
+       xmlns="http://maven.apache.org/POM/4.0.0"; 
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";>
+
+       <modelVersion>4.0.0</modelVersion>
+
+       <parent>
+               <groupId>io.github.zentol.flink</groupId>
+               <artifactId>flink-connector-parent</artifactId>
+               <version>1.0</version>
+       </parent>
+
+       <groupId>org.apache.flink</groupId>
+       <artifactId>flink-connector-mongodb-parent</artifactId>
+       <version>1.0.0-SNAPSHOT</version>
+
+       <name>Flink : Connectors : MongoDB Parent</name>
+       <packaging>pom</packaging>
+       <url>https://flink.apache.org</url>
+       <inceptionYear>2022</inceptionYear>
+
+       <licenses>
+               <license>
+                       <name>The Apache Software License, Version 2.0</name>
+                       
<url>https://www.apache.org/licenses/LICENSE-2.0.txt</url>
+                       <distribution>repo</distribution>
+               </license>
+       </licenses>
+
+       <scm>
+               <url>https://github.com/apache/flink-connector-mongodb</url>
+               
<connection>[email protected]:apache/flink-connector-mongodb.git</connection>
+               
<developerConnection>scm:git:https://gitbox.apache.org/repos/asf/flink-connector-mongodb.git</developerConnection>
+       </scm>
+
+       <properties>
+               <mongodb.version>4.7.1</mongodb.version>
+
+               <flink.version>1.16-SNAPSHOT</flink.version>
+               <flink.shaded.version>15.0</flink.shaded.version>
+
+               <junit4.version>4.13.2</junit4.version>
+               <junit5.version>5.8.1</junit5.version>
+               <hamcrest.version>1.3</hamcrest.version>
+               <assertj.version>3.21.0</assertj.version>
+               <archunit.version>0.22.0</archunit.version>
+               <testcontainers.version>1.17.2</testcontainers.version>
+               <mockito.version>3.4.6</mockito.version>
+
+               <japicmp.skip>false</japicmp.skip>
+               <japicmp.referenceVersion>1.15.0</japicmp.referenceVersion>
+
+               <slf4j.version>1.7.36</slf4j.version>
+               <log4j.version>2.17.2</log4j.version>
+
+               
<flink.parent.artifactId>flink-connector-mongodb-parent</flink.parent.artifactId>
+       </properties>
+
+       <modules>
+               <module>flink-connector-mongodb</module>
+               <module>flink-sql-connector-mongodb</module>
+               <module>flink-connector-mongodb-e2e-tests</module>
+       </modules>
+
+       <dependencies>
+               <dependency>
+                       <groupId>org.apache.flink</groupId>
+                       <artifactId>flink-shaded-force-shading</artifactId>
+                       <version>${flink.shaded.version}</version>
+               </dependency>
+
+               <!-- Root dependencies for all projects -->
+
+               <!-- Logging API -->
+               <dependency>
+                       <groupId>org.slf4j</groupId>
+                       <artifactId>slf4j-api</artifactId>
+                       <scope>provided</scope>
+               </dependency>
+
+               <!-- 'javax.annotation' classes like '@Nullable' -->
+               <dependency>
+                       <groupId>com.google.code.findbugs</groupId>
+                       <artifactId>jsr305</artifactId>
+                       <scope>provided</scope>
+               </dependency>
+
+               <!-- Test dependencies -->
+               <dependency>
+                       <groupId>org.junit.jupiter</groupId>
+                       <artifactId>junit-jupiter</artifactId>
+                       <scope>test</scope>
+               </dependency>
+
+               <dependency>
+                       <groupId>org.junit.vintage</groupId>
+                       <artifactId>junit-vintage-engine</artifactId>
+                       <scope>test</scope>
+               </dependency>
+
+               <dependency>
+                       <groupId>org.assertj</groupId>
+                       <artifactId>assertj-core</artifactId>
+                       <scope>test</scope>
+               </dependency>
+
+               <dependency>
+                       <groupId>org.hamcrest</groupId>
+                       <artifactId>hamcrest-all</artifactId>
+                       <scope>test</scope>
+               </dependency>
+
+               <dependency>
+                       <groupId>org.mockito</groupId>
+                       <artifactId>mockito-inline</artifactId>
+                       <type>jar</type>
+                       <scope>test</scope>
+               </dependency>
+
+               <dependency>
+                       <groupId>org.mockito</groupId>
+                       <artifactId>mockito-core</artifactId>
+                       <type>jar</type>
+                       <scope>test</scope>
+               </dependency>
+
+               <dependency>
+                       <groupId>org.testcontainers</groupId>
+                       <artifactId>junit-jupiter</artifactId>
+                       <scope>test</scope>
+               </dependency>
+
+               <!-- Tests will have log4j as the default logging framework 
available -->
+
+               <dependency>
+                       <groupId>org.apache.logging.log4j</groupId>
+                       <artifactId>log4j-slf4j-impl</artifactId>
+                       <scope>test</scope>
+               </dependency>
+
+               <dependency>
+                       <groupId>org.apache.logging.log4j</groupId>
+                       <artifactId>log4j-api</artifactId>
+                       <scope>test</scope>
+               </dependency>
+
+               <dependency>
+                       <groupId>org.apache.logging.log4j</groupId>
+                       <artifactId>log4j-core</artifactId>
+                       <scope>test</scope>
+               </dependency>
+
+               <dependency>
+                       <!-- API bridge between log4j 1 and 2 -->
+                       <groupId>org.apache.logging.log4j</groupId>
+                       <artifactId>log4j-1.2-api</artifactId>
+                       <scope>test</scope>
+               </dependency>
+
+               <dependency>
+                       <groupId>org.apache.flink</groupId>
+                       <artifactId>flink-test-utils-junit</artifactId>
+                       <scope>test</scope>
+               </dependency>
+
+               <!-- ArchUit test dependencies -->
+               <dependency>
+                       <groupId>org.apache.flink</groupId>
+                       <artifactId>flink-architecture-tests-test</artifactId>
+                       <scope>test</scope>
+               </dependency>
+               <dependency>
+                       <groupId>org.apache.flink</groupId>
+                       
<artifactId>flink-architecture-tests-production</artifactId>
+                       <scope>test</scope>
+               </dependency>
+       </dependencies>
+
+       <dependencyManagement>
+               <dependencies>
+                       <dependency>
+                               <groupId>org.mongodb</groupId>
+                               <artifactId>mongodb-driver-sync</artifactId>

Review Comment:
   Why did you choose this client instead of the Reactive Streams Driver?
   Isn't the sync client potentially blocking the source/sink thread for a long 
time?



##########
flink-connector-mongodb/src/main/java/org/apache/flink/connector/mongodb/sink/config/MongoWriteOptions.java:
##########
@@ -0,0 +1,205 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.mongodb.sink.config;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.connector.base.DeliveryGuarantee;
+
+import javax.annotation.Nullable;
+
+import java.io.Serializable;
+import java.util.Objects;
+
+import static 
org.apache.flink.connector.mongodb.table.config.MongoConnectorOptions.BULK_FLUSH_INTERVAL;
+import static 
org.apache.flink.connector.mongodb.table.config.MongoConnectorOptions.BULK_FLUSH_MAX_ACTIONS;
+import static 
org.apache.flink.connector.mongodb.table.config.MongoConnectorOptions.SINK_MAX_RETRIES;
+import static org.apache.flink.util.Preconditions.checkArgument;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+import static org.apache.flink.util.Preconditions.checkState;
+
+/** The configured class for Mongo sink. */
+@PublicEvolving
+public class MongoWriteOptions implements Serializable {
+
+    private static final long serialVersionUID = 1L;
+
+    private final int bulkFlushMaxActions;
+    private final long bulkFlushIntervalMs;
+    private final int maxRetryTimes;
+    private final DeliveryGuarantee deliveryGuarantee;
+    private final @Nullable Integer parallelism;
+
+    private MongoWriteOptions(
+            int bulkFlushMaxActions,
+            long bulkFlushIntervalMs,
+            int maxRetryTimes,
+            DeliveryGuarantee deliveryGuarantee,
+            @Nullable Integer parallelism) {
+        this.bulkFlushMaxActions = bulkFlushMaxActions;
+        this.bulkFlushIntervalMs = bulkFlushIntervalMs;
+        this.maxRetryTimes = maxRetryTimes;
+        this.deliveryGuarantee = deliveryGuarantee;
+        this.parallelism = parallelism;
+    }
+
+    public int getBulkFlushMaxActions() {
+        return bulkFlushMaxActions;
+    }
+
+    public long getBulkFlushIntervalMs() {
+        return bulkFlushIntervalMs;
+    }
+
+    public int getMaxRetryTimes() {
+        return maxRetryTimes;
+    }
+
+    public DeliveryGuarantee getDeliveryGuarantee() {
+        return deliveryGuarantee;
+    }
+
+    @Nullable
+    public Integer getParallelism() {
+        return parallelism;
+    }
+
+    @Override
+    public boolean equals(Object o) {
+        if (this == o) {
+            return true;
+        }
+        if (o == null || getClass() != o.getClass()) {
+            return false;
+        }
+        MongoWriteOptions that = (MongoWriteOptions) o;
+        return bulkFlushMaxActions == that.bulkFlushMaxActions
+                && bulkFlushIntervalMs == that.bulkFlushIntervalMs
+                && maxRetryTimes == that.maxRetryTimes
+                && deliveryGuarantee == that.deliveryGuarantee
+                && Objects.equals(parallelism, that.parallelism);
+    }
+
+    @Override
+    public int hashCode() {
+        return Objects.hash(
+                bulkFlushMaxActions,
+                bulkFlushIntervalMs,
+                maxRetryTimes,
+                deliveryGuarantee,
+                parallelism);
+    }
+
+    public static MongoWriteOptionsBuilder builder() {
+        return new MongoWriteOptionsBuilder();
+    }
+
+    /** Builder for {@link MongoWriteOptions}. */
+    public static class MongoWriteOptionsBuilder {
+        private int bulkFlushMaxActions = 
BULK_FLUSH_MAX_ACTIONS.defaultValue();
+        private long bulkFlushIntervalMs = 
BULK_FLUSH_INTERVAL.defaultValue().toMillis();
+        private int maxRetryTimes = SINK_MAX_RETRIES.defaultValue();
+        private DeliveryGuarantee deliveryGuarantee = 
DeliveryGuarantee.AT_LEAST_ONCE;
+        private @Nullable Integer parallelism;
+
+        /**
+         * Sets the maximum number of actions to buffer for each bulk request. 
You can pass -1 to
+         * disable it. The default flush size 1000.
+         *
+         * @param numMaxActions the maximum number of actions to buffer per 
bulk request.
+         * @return this builder
+         */
+        public MongoWriteOptionsBuilder setBulkFlushMaxActions(int 
numMaxActions) {
+            checkState(
+                    numMaxActions == -1 || numMaxActions > 0,
+                    "Max number of buffered actions must be larger than 0.");
+            this.bulkFlushMaxActions = numMaxActions;
+            return this;
+        }
+
+        /**
+         * Sets the bulk flush interval, in milliseconds. You can pass -1 to 
disable it.
+         *
+         * @param intervalMillis the bulk flush interval, in milliseconds.
+         * @return this builder
+         */
+        public MongoWriteOptionsBuilder setBulkFlushIntervalMs(long 
intervalMillis) {
+            checkState(
+                    intervalMillis == -1 || intervalMillis >= 0,
+                    "Interval (in milliseconds) between each flush must be 
larger than "
+                            + "or equal to 0.");
+            this.bulkFlushIntervalMs = intervalMillis;
+            return this;
+        }
+
+        /**
+         * Sets the max retry times if writing records failed.
+         *
+         * @param maxRetryTimes the max retry times.
+         * @return this builder
+         */
+        public MongoWriteOptionsBuilder setMaxRetryTimes(int maxRetryTimes) {
+            checkArgument(
+                    maxRetryTimes >= 0, "The max retry times must be larger 
than or equal to 0.");
+            this.maxRetryTimes = maxRetryTimes;
+            return this;
+        }
+
+        /**
+         * Sets the wanted {@link DeliveryGuarantee}. The default delivery 
guarantee is {@link
+         * DeliveryGuarantee#NONE}

Review Comment:
   The default shouldn't drop data.



##########
flink-connector-mongodb/src/main/java/org/apache/flink/connector/mongodb/source/MongoSourceBuilder.java:
##########
@@ -0,0 +1,219 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.mongodb.source;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.configuration.MemorySize;
+import org.apache.flink.connector.mongodb.common.config.MongoConnectionOptions;
+import org.apache.flink.connector.mongodb.source.config.MongoReadOptions;
+import 
org.apache.flink.connector.mongodb.source.enumerator.splitter.PartitionStrategy;
+import 
org.apache.flink.connector.mongodb.source.reader.deserializer.MongoDeserializationSchema;
+
+import org.bson.BsonDocument;
+
+import java.util.Arrays;
+import java.util.List;
+
+import static org.apache.flink.util.CollectionUtil.isNullOrEmpty;
+import static org.apache.flink.util.Preconditions.checkArgument;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * The builder class for {@link MongoSource} to make it easier for the users 
to construct a {@link
+ * MongoSource}.
+ *
+ * @param <OUT> The output type of the source.
+ */
+@PublicEvolving
+public class MongoSourceBuilder<OUT> {
+
+    private final MongoConnectionOptions.MongoConnectionOptionsBuilder 
connectionOptionsBuilder;
+    private final MongoReadOptions.MongoReadOptionsBuilder readOptionsBuilder;
+
+    private List<String> projectedFields;
+    private int limit = -1;
+    private MongoDeserializationSchema<OUT> deserializationSchema;
+
+    MongoSourceBuilder() {
+        this.connectionOptionsBuilder = MongoConnectionOptions.builder();
+        this.readOptionsBuilder = MongoReadOptions.builder();
+    }
+
+    /**
+     * Sets the connection string of MongoDB.
+     *
+     * @param uri connection string of MongoDB
+     * @return this builder
+     */
+    public MongoSourceBuilder<OUT> setUri(String uri) {
+        connectionOptionsBuilder.setUri(uri);
+        return this;
+    }
+
+    /**
+     * Sets the database to sink of MongoDB.
+     *
+     * @param database the database to sink of MongoDB.
+     * @return this builder
+     */
+    public MongoSourceBuilder<OUT> setDatabase(String database) {
+        connectionOptionsBuilder.setDatabase(database);
+        return this;
+    }
+
+    /**
+     * Sets the collection to sink of MongoDB.
+     *
+     * @param collection the collection to sink of MongoDB.
+     * @return this builder
+     */
+    public MongoSourceBuilder<OUT> setCollection(String collection) {
+        connectionOptionsBuilder.setCollection(collection);
+        return this;
+    }
+
+    /**
+     * Sets the number of documents should be fetched per round-trip when 
reading.
+     *
+     * @param fetchSize the number of documents should be fetched per 
round-trip when reading.
+     * @return this builder
+     */
+    public MongoSourceBuilder<OUT> setFetchSize(int fetchSize) {
+        readOptionsBuilder.setFetchSize(fetchSize);
+        return this;
+    }
+
+    /**
+     * Sets the batch size of MongoDB find cursor.
+     *
+     * @param cursorBatchSize the max batch size of find cursor.
+     * @return this builder
+     */
+    public MongoSourceBuilder<OUT> setCursorBatchSize(int cursorBatchSize) {
+        readOptionsBuilder.setCursorBatchSize(cursorBatchSize);
+        return this;
+    }
+
+    /**
+     * Set this option to true to prevent cursor timeout (defaults to 10 
minutes).
+     *
+     * @param noCursorTimeout Set this option to true to prevent cursor 
timeout.
+     * @return this builder
+     */
+    public MongoSourceBuilder<OUT> setNoCursorTimeout(boolean noCursorTimeout) 
{
+        readOptionsBuilder.setNoCursorTimeout(noCursorTimeout);
+        return this;
+    }
+
+    /**
+     * Sets the partition strategy.
+     *
+     * @param partitionStrategy the strategy of a partition.
+     * @return this builder
+     */
+    public MongoSourceBuilder<OUT> setPartitionStrategy(PartitionStrategy 
partitionStrategy) {
+        readOptionsBuilder.setPartitionStrategy(partitionStrategy);
+        return this;
+    }
+
+    /**
+     * Sets the partition size of MongoDB split.
+     *
+     * @param partitionSize the memory size of a partition.
+     * @return this builder
+     */
+    public MongoSourceBuilder<OUT> setPartitionSize(MemorySize partitionSize) {
+        readOptionsBuilder.setPartitionSize(partitionSize);
+        return this;
+    }
+
+    /**
+     * Sets the samples size per partition only effective for sample partition 
strategy.
+     *
+     * @param samplesPerPartition the samples size per partition
+     * @return this builder
+     */
+    public MongoSourceBuilder<OUT> setSamplesPerPartition(int 
samplesPerPartition) {
+        readOptionsBuilder.setSamplesPerPartition(samplesPerPartition);
+        return this;
+    }
+
+    /**
+     * Sets the limit of documents to read.

Review Comment:
   What happens if limit is not set?



##########
flink-connector-mongodb/src/main/java/org/apache/flink/connector/mongodb/sink/writer/MongoWriter.java:
##########
@@ -0,0 +1,183 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.mongodb.sink.writer;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.common.functions.util.ListCollector;
+import org.apache.flink.api.common.operators.MailboxExecutor;
+import org.apache.flink.api.common.serialization.SerializationSchema;
+import org.apache.flink.api.connector.sink2.Sink;
+import org.apache.flink.api.connector.sink2.SinkWriter;
+import org.apache.flink.connector.mongodb.common.config.MongoConnectionOptions;
+import org.apache.flink.connector.mongodb.sink.config.MongoWriteOptions;
+import org.apache.flink.connector.mongodb.sink.writer.context.MongoSinkContext;
+import 
org.apache.flink.connector.mongodb.sink.writer.context.MongoSinkContextImpl;
+import 
org.apache.flink.connector.mongodb.sink.writer.serializer.MongoSerializationSchema;
+import org.apache.flink.metrics.groups.SinkWriterMetricGroup;
+import org.apache.flink.runtime.operators.util.metrics.CountingCollector;
+import org.apache.flink.util.Collector;
+import org.apache.flink.util.FlinkRuntimeException;
+
+import com.mongodb.MongoException;
+import com.mongodb.client.MongoClient;
+import com.mongodb.client.MongoClients;
+import com.mongodb.client.model.WriteModel;
+import org.bson.BsonDocument;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * This class is responsible to write records in a MongoDB collection.
+ *
+ * @param <IN> The type of the input elements.
+ */
+@Internal
+public class MongoWriter<IN> implements SinkWriter<IN> {
+
+    private static final Logger LOG = 
LoggerFactory.getLogger(MongoWriter.class);
+
+    private final MongoConnectionOptions connectionOptions;
+    private final MongoWriteOptions writeOptions;
+    private final MongoSerializationSchema<IN> serializationSchema;
+    private final MongoSinkContext sinkContext;
+    private final MailboxExecutor mailboxExecutor;
+    private final boolean flushOnCheckpoint;
+    private final List<WriteModel<BsonDocument>> bulkRequests = new 
ArrayList<>();
+    private final Collector<WriteModel<BsonDocument>> collector;
+    private final MongoClient mongoClient;
+
+    private boolean checkpointInProgress = false;
+    private volatile long lastSendTime = 0L;
+    private volatile long ackTime = Long.MAX_VALUE;
+
+    public MongoWriter(
+            MongoConnectionOptions connectionOptions,
+            MongoWriteOptions writeOptions,
+            boolean flushOnCheckpoint,
+            Sink.InitContext initContext,
+            MongoSerializationSchema<IN> serializationSchema) {
+        this.connectionOptions = checkNotNull(connectionOptions);
+        this.writeOptions = checkNotNull(writeOptions);
+        this.serializationSchema = checkNotNull(serializationSchema);
+        this.flushOnCheckpoint = flushOnCheckpoint;
+
+        checkNotNull(initContext);
+        this.mailboxExecutor = checkNotNull(initContext.getMailboxExecutor());
+
+        SinkWriterMetricGroup metricGroup = 
checkNotNull(initContext.metricGroup());
+        metricGroup.setCurrentSendTimeGauge(() -> ackTime - lastSendTime);
+
+        this.collector =
+                new CountingCollector<>(
+                        new ListCollector<>(this.bulkRequests),
+                        metricGroup.getNumRecordsSendCounter());
+
+        // Initialize the serialization schema.
+        this.sinkContext = new MongoSinkContextImpl(initContext, writeOptions);
+        try {
+            SerializationSchema.InitializationContext initializationContext =
+                    initContext.asSerializationSchemaInitializationContext();
+            serializationSchema.open(initializationContext, sinkContext, 
writeOptions);
+        } catch (Exception e) {
+            throw new FlinkRuntimeException("Failed to open the MongoEmitter", 
e);
+        }
+
+        // Initialize the mongo client.
+        this.mongoClient = MongoClients.create(connectionOptions.getUri());
+    }
+
+    @Override
+    public void write(IN element, Context context) throws IOException, 
InterruptedException {
+        // do not allow new bulk writes until all actions are flushed
+        while (checkpointInProgress) {
+            mailboxExecutor.yield();
+        }
+        collector.collect(serializationSchema.serialize(element, sinkContext));
+        if (isOverMaxActionsLimit() || isOverMaxFlushIntervalLimit()) {
+            doBulkWrite();
+        }
+    }
+
+    @Override
+    public void flush(boolean endOfInput) throws IOException {
+        checkpointInProgress = true;
+        while (!bulkRequests.isEmpty() && (flushOnCheckpoint || endOfInput)) {
+            doBulkWrite();
+        }
+        checkpointInProgress = false;
+    }
+
+    @Override
+    public void close() {
+        if (mongoClient != null) {

Review Comment:
   This condition is always true.



##########
flink-connector-mongodb/src/main/java/org/apache/flink/connector/mongodb/sink/writer/context/MongoSinkContextImpl.java:
##########
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.mongodb.sink.writer.context;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.operators.ProcessingTimeService;
+import org.apache.flink.api.connector.sink2.Sink;
+import org.apache.flink.connector.mongodb.sink.config.MongoWriteOptions;
+
+/** An implementation that would contain all the required context. */
+@Internal
+public class MongoSinkContextImpl implements MongoSinkContext {

Review Comment:
   ```suggestion
   public class DefaultMongoSinkContext implements MongoSinkContext {
   ```
   It's usually more consistent to add such a qualifier to the front of the 
class name.



##########
flink-connector-mongodb/src/main/java/org/apache/flink/connector/mongodb/sink/writer/serializer/MongoSerializationSchema.java:
##########
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.mongodb.sink.writer.serializer;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.api.common.serialization.SerializationSchema;
+import org.apache.flink.connector.mongodb.sink.config.MongoWriteOptions;
+import org.apache.flink.connector.mongodb.sink.writer.context.MongoSinkContext;
+
+import com.mongodb.client.model.WriteModel;
+import org.bson.BsonDocument;
+
+import java.io.Serializable;
+
+/**
+ * The serialization schema for how to serialize records into MongoDB.
+ *
+ * @param <IN> The message type send to MongoDB.
+ */
+@PublicEvolving
+public interface MongoSerializationSchema<IN> extends Serializable {
+
+    /**
+     * Initialization method for the schema. It is called before the actual 
working methods {@link
+     * #serialize(Object, MongoSinkContext)} and thus suitable for one-time 
setup work.
+     *
+     * <p>The provided {@link SerializationSchema.InitializationContext} can 
be used to access
+     * additional features such as registering user metrics.
+     *
+     * @param initializationContext Contextual information that can be used 
during initialization.
+     * @param sinkContext Runtime information i.e. partitions, subtaskId.
+     * @param sinkConfiguration All the configure options for the MongoDB 
sink. You can add custom

Review Comment:
   How's that supposed to work?



##########
flink-connector-mongodb/src/main/java/org/apache/flink/connector/mongodb/sink/writer/MongoWriter.java:
##########
@@ -0,0 +1,183 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.mongodb.sink.writer;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.common.functions.util.ListCollector;
+import org.apache.flink.api.common.operators.MailboxExecutor;
+import org.apache.flink.api.common.serialization.SerializationSchema;
+import org.apache.flink.api.connector.sink2.Sink;
+import org.apache.flink.api.connector.sink2.SinkWriter;
+import org.apache.flink.connector.mongodb.common.config.MongoConnectionOptions;
+import org.apache.flink.connector.mongodb.sink.config.MongoWriteOptions;
+import org.apache.flink.connector.mongodb.sink.writer.context.MongoSinkContext;
+import 
org.apache.flink.connector.mongodb.sink.writer.context.MongoSinkContextImpl;
+import 
org.apache.flink.connector.mongodb.sink.writer.serializer.MongoSerializationSchema;
+import org.apache.flink.metrics.groups.SinkWriterMetricGroup;
+import org.apache.flink.runtime.operators.util.metrics.CountingCollector;
+import org.apache.flink.util.Collector;
+import org.apache.flink.util.FlinkRuntimeException;
+
+import com.mongodb.MongoException;
+import com.mongodb.client.MongoClient;
+import com.mongodb.client.MongoClients;
+import com.mongodb.client.model.WriteModel;
+import org.bson.BsonDocument;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * This class is responsible to write records in a MongoDB collection.
+ *
+ * @param <IN> The type of the input elements.
+ */
+@Internal
+public class MongoWriter<IN> implements SinkWriter<IN> {
+
+    private static final Logger LOG = 
LoggerFactory.getLogger(MongoWriter.class);
+
+    private final MongoConnectionOptions connectionOptions;
+    private final MongoWriteOptions writeOptions;
+    private final MongoSerializationSchema<IN> serializationSchema;
+    private final MongoSinkContext sinkContext;
+    private final MailboxExecutor mailboxExecutor;
+    private final boolean flushOnCheckpoint;
+    private final List<WriteModel<BsonDocument>> bulkRequests = new 
ArrayList<>();
+    private final Collector<WriteModel<BsonDocument>> collector;
+    private final MongoClient mongoClient;
+
+    private boolean checkpointInProgress = false;
+    private volatile long lastSendTime = 0L;
+    private volatile long ackTime = Long.MAX_VALUE;
+
+    public MongoWriter(
+            MongoConnectionOptions connectionOptions,
+            MongoWriteOptions writeOptions,
+            boolean flushOnCheckpoint,
+            Sink.InitContext initContext,
+            MongoSerializationSchema<IN> serializationSchema) {
+        this.connectionOptions = checkNotNull(connectionOptions);
+        this.writeOptions = checkNotNull(writeOptions);
+        this.serializationSchema = checkNotNull(serializationSchema);
+        this.flushOnCheckpoint = flushOnCheckpoint;
+
+        checkNotNull(initContext);
+        this.mailboxExecutor = checkNotNull(initContext.getMailboxExecutor());
+
+        SinkWriterMetricGroup metricGroup = 
checkNotNull(initContext.metricGroup());
+        metricGroup.setCurrentSendTimeGauge(() -> ackTime - lastSendTime);
+
+        this.collector =
+                new CountingCollector<>(
+                        new ListCollector<>(this.bulkRequests),
+                        metricGroup.getNumRecordsSendCounter());
+
+        // Initialize the serialization schema.
+        this.sinkContext = new MongoSinkContextImpl(initContext, writeOptions);
+        try {
+            SerializationSchema.InitializationContext initializationContext =
+                    initContext.asSerializationSchemaInitializationContext();
+            serializationSchema.open(initializationContext, sinkContext, 
writeOptions);
+        } catch (Exception e) {
+            throw new FlinkRuntimeException("Failed to open the MongoEmitter", 
e);
+        }
+
+        // Initialize the mongo client.
+        this.mongoClient = MongoClients.create(connectionOptions.getUri());
+    }
+
+    @Override
+    public void write(IN element, Context context) throws IOException, 
InterruptedException {
+        // do not allow new bulk writes until all actions are flushed
+        while (checkpointInProgress) {
+            mailboxExecutor.yield();
+        }
+        collector.collect(serializationSchema.serialize(element, sinkContext));
+        if (isOverMaxActionsLimit() || isOverMaxFlushIntervalLimit()) {
+            doBulkWrite();
+        }
+    }
+
+    @Override
+    public void flush(boolean endOfInput) throws IOException {
+        checkpointInProgress = true;
+        while (!bulkRequests.isEmpty() && (flushOnCheckpoint || endOfInput)) {
+            doBulkWrite();
+        }
+        checkpointInProgress = false;
+    }
+
+    @Override
+    public void close() {
+        if (mongoClient != null) {
+            mongoClient.close();
+        }
+    }
+
+    @VisibleForTesting
+    void doBulkWrite() throws IOException {
+        if (bulkRequests.isEmpty()) {
+            // no records to write
+            return;
+        }
+
+        int maxRetryTimes = writeOptions.getMaxRetryTimes();
+        for (int i = 0; i <= maxRetryTimes; i++) {
+            try {
+                lastSendTime = System.currentTimeMillis();
+                mongoClient
+                        .getDatabase(connectionOptions.getDatabase())
+                        .getCollection(connectionOptions.getCollection(), 
BsonDocument.class)
+                        .bulkWrite(bulkRequests);
+                ackTime = System.currentTimeMillis();
+                bulkRequests.clear();
+                break;
+            } catch (MongoException e) {
+                LOG.error("Bulk Write to MongoDB failed, retry times = {}", i, 
e);
+                if (i >= maxRetryTimes) {
+                    throw new IOException(e);
+                }
+                try {
+                    Thread.sleep(1000L * i);

Review Comment:
   this should be configurable.
   
   The first attempt also doesn't wait at all which is likely not intended.



##########
flink-connector-mongodb/src/main/java/org/apache/flink/connector/mongodb/sink/writer/context/MongoSinkContextImpl.java:
##########
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.mongodb.sink.writer.context;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.operators.ProcessingTimeService;
+import org.apache.flink.api.connector.sink2.Sink;
+import org.apache.flink.connector.mongodb.sink.config.MongoWriteOptions;
+
+/** An implementation that would contain all the required context. */

Review Comment:
   ```suggestion
   /** Default {@link MongoSinkContext} implementation. */
   ```



##########
flink-connector-mongodb/src/main/java/org/apache/flink/connector/mongodb/sink/config/MongoWriteOptions.java:
##########
@@ -0,0 +1,205 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.mongodb.sink.config;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.connector.base.DeliveryGuarantee;
+
+import javax.annotation.Nullable;
+
+import java.io.Serializable;
+import java.util.Objects;
+
+import static 
org.apache.flink.connector.mongodb.table.config.MongoConnectorOptions.BULK_FLUSH_INTERVAL;
+import static 
org.apache.flink.connector.mongodb.table.config.MongoConnectorOptions.BULK_FLUSH_MAX_ACTIONS;
+import static 
org.apache.flink.connector.mongodb.table.config.MongoConnectorOptions.SINK_MAX_RETRIES;
+import static org.apache.flink.util.Preconditions.checkArgument;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+import static org.apache.flink.util.Preconditions.checkState;
+
+/** The configured class for Mongo sink. */
+@PublicEvolving
+public class MongoWriteOptions implements Serializable {
+
+    private static final long serialVersionUID = 1L;
+
+    private final int bulkFlushMaxActions;
+    private final long bulkFlushIntervalMs;
+    private final int maxRetryTimes;
+    private final DeliveryGuarantee deliveryGuarantee;
+    private final @Nullable Integer parallelism;
+
+    private MongoWriteOptions(
+            int bulkFlushMaxActions,
+            long bulkFlushIntervalMs,
+            int maxRetryTimes,
+            DeliveryGuarantee deliveryGuarantee,
+            @Nullable Integer parallelism) {
+        this.bulkFlushMaxActions = bulkFlushMaxActions;
+        this.bulkFlushIntervalMs = bulkFlushIntervalMs;
+        this.maxRetryTimes = maxRetryTimes;
+        this.deliveryGuarantee = deliveryGuarantee;
+        this.parallelism = parallelism;
+    }
+
+    public int getBulkFlushMaxActions() {
+        return bulkFlushMaxActions;
+    }
+
+    public long getBulkFlushIntervalMs() {
+        return bulkFlushIntervalMs;
+    }
+
+    public int getMaxRetryTimes() {
+        return maxRetryTimes;
+    }
+
+    public DeliveryGuarantee getDeliveryGuarantee() {
+        return deliveryGuarantee;
+    }
+
+    @Nullable
+    public Integer getParallelism() {
+        return parallelism;
+    }
+
+    @Override
+    public boolean equals(Object o) {
+        if (this == o) {
+            return true;
+        }
+        if (o == null || getClass() != o.getClass()) {
+            return false;
+        }
+        MongoWriteOptions that = (MongoWriteOptions) o;
+        return bulkFlushMaxActions == that.bulkFlushMaxActions
+                && bulkFlushIntervalMs == that.bulkFlushIntervalMs
+                && maxRetryTimes == that.maxRetryTimes
+                && deliveryGuarantee == that.deliveryGuarantee
+                && Objects.equals(parallelism, that.parallelism);
+    }
+
+    @Override
+    public int hashCode() {
+        return Objects.hash(
+                bulkFlushMaxActions,
+                bulkFlushIntervalMs,
+                maxRetryTimes,
+                deliveryGuarantee,
+                parallelism);
+    }
+
+    public static MongoWriteOptionsBuilder builder() {
+        return new MongoWriteOptionsBuilder();
+    }
+
+    /** Builder for {@link MongoWriteOptions}. */
+    public static class MongoWriteOptionsBuilder {
+        private int bulkFlushMaxActions = 
BULK_FLUSH_MAX_ACTIONS.defaultValue();
+        private long bulkFlushIntervalMs = 
BULK_FLUSH_INTERVAL.defaultValue().toMillis();
+        private int maxRetryTimes = SINK_MAX_RETRIES.defaultValue();
+        private DeliveryGuarantee deliveryGuarantee = 
DeliveryGuarantee.AT_LEAST_ONCE;
+        private @Nullable Integer parallelism;
+
+        /**
+         * Sets the maximum number of actions to buffer for each bulk request. 
You can pass -1 to
+         * disable it. The default flush size 1000.
+         *
+         * @param numMaxActions the maximum number of actions to buffer per 
bulk request.
+         * @return this builder
+         */
+        public MongoWriteOptionsBuilder setBulkFlushMaxActions(int 
numMaxActions) {
+            checkState(
+                    numMaxActions == -1 || numMaxActions > 0,
+                    "Max number of buffered actions must be larger than 0.");
+            this.bulkFlushMaxActions = numMaxActions;
+            return this;
+        }
+
+        /**
+         * Sets the bulk flush interval, in milliseconds. You can pass -1 to 
disable it.
+         *
+         * @param intervalMillis the bulk flush interval, in milliseconds.
+         * @return this builder
+         */
+        public MongoWriteOptionsBuilder setBulkFlushIntervalMs(long 
intervalMillis) {
+            checkState(
+                    intervalMillis == -1 || intervalMillis >= 0,
+                    "Interval (in milliseconds) between each flush must be 
larger than "
+                            + "or equal to 0.");
+            this.bulkFlushIntervalMs = intervalMillis;
+            return this;
+        }
+
+        /**
+         * Sets the max retry times if writing records failed.
+         *
+         * @param maxRetryTimes the max retry times.
+         * @return this builder
+         */
+        public MongoWriteOptionsBuilder setMaxRetryTimes(int maxRetryTimes) {
+            checkArgument(
+                    maxRetryTimes >= 0, "The max retry times must be larger 
than or equal to 0.");
+            this.maxRetryTimes = maxRetryTimes;
+            return this;
+        }
+
+        /**
+         * Sets the wanted {@link DeliveryGuarantee}. The default delivery 
guarantee is {@link
+         * DeliveryGuarantee#NONE}
+         *
+         * @param deliveryGuarantee which describes the record emission 
behaviour
+         * @return this builder
+         */
+        public MongoWriteOptionsBuilder setDeliveryGuarantee(DeliveryGuarantee 
deliveryGuarantee) {
+            checkState(
+                    deliveryGuarantee != DeliveryGuarantee.EXACTLY_ONCE,
+                    "Mongo sink does not support the EXACTLY_ONCE guarantee.");
+            this.deliveryGuarantee = checkNotNull(deliveryGuarantee);
+            return this;
+        }
+
+        /**
+         * Sets the write parallelism.

Review Comment:
   This needs a better description. Is it something like maxConcurrentBulks? 
Why should you use this over increasing the sink parallelism?



##########
flink-connector-mongodb/src/main/java/org/apache/flink/connector/mongodb/sink/config/MongoWriteOptions.java:
##########
@@ -0,0 +1,205 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.mongodb.sink.config;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.connector.base.DeliveryGuarantee;
+
+import javax.annotation.Nullable;
+
+import java.io.Serializable;
+import java.util.Objects;
+
+import static 
org.apache.flink.connector.mongodb.table.config.MongoConnectorOptions.BULK_FLUSH_INTERVAL;
+import static 
org.apache.flink.connector.mongodb.table.config.MongoConnectorOptions.BULK_FLUSH_MAX_ACTIONS;
+import static 
org.apache.flink.connector.mongodb.table.config.MongoConnectorOptions.SINK_MAX_RETRIES;
+import static org.apache.flink.util.Preconditions.checkArgument;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+import static org.apache.flink.util.Preconditions.checkState;
+
+/** The configured class for Mongo sink. */
+@PublicEvolving
+public class MongoWriteOptions implements Serializable {
+
+    private static final long serialVersionUID = 1L;
+
+    private final int bulkFlushMaxActions;
+    private final long bulkFlushIntervalMs;
+    private final int maxRetryTimes;
+    private final DeliveryGuarantee deliveryGuarantee;
+    private final @Nullable Integer parallelism;
+
+    private MongoWriteOptions(
+            int bulkFlushMaxActions,
+            long bulkFlushIntervalMs,
+            int maxRetryTimes,
+            DeliveryGuarantee deliveryGuarantee,
+            @Nullable Integer parallelism) {
+        this.bulkFlushMaxActions = bulkFlushMaxActions;
+        this.bulkFlushIntervalMs = bulkFlushIntervalMs;
+        this.maxRetryTimes = maxRetryTimes;
+        this.deliveryGuarantee = deliveryGuarantee;
+        this.parallelism = parallelism;
+    }
+
+    public int getBulkFlushMaxActions() {
+        return bulkFlushMaxActions;
+    }
+
+    public long getBulkFlushIntervalMs() {
+        return bulkFlushIntervalMs;
+    }
+
+    public int getMaxRetryTimes() {
+        return maxRetryTimes;
+    }
+
+    public DeliveryGuarantee getDeliveryGuarantee() {
+        return deliveryGuarantee;
+    }
+
+    @Nullable
+    public Integer getParallelism() {
+        return parallelism;
+    }
+
+    @Override
+    public boolean equals(Object o) {
+        if (this == o) {
+            return true;
+        }
+        if (o == null || getClass() != o.getClass()) {
+            return false;
+        }
+        MongoWriteOptions that = (MongoWriteOptions) o;
+        return bulkFlushMaxActions == that.bulkFlushMaxActions
+                && bulkFlushIntervalMs == that.bulkFlushIntervalMs
+                && maxRetryTimes == that.maxRetryTimes
+                && deliveryGuarantee == that.deliveryGuarantee
+                && Objects.equals(parallelism, that.parallelism);
+    }
+
+    @Override
+    public int hashCode() {
+        return Objects.hash(
+                bulkFlushMaxActions,
+                bulkFlushIntervalMs,
+                maxRetryTimes,
+                deliveryGuarantee,
+                parallelism);
+    }
+
+    public static MongoWriteOptionsBuilder builder() {
+        return new MongoWriteOptionsBuilder();
+    }
+
+    /** Builder for {@link MongoWriteOptions}. */
+    public static class MongoWriteOptionsBuilder {
+        private int bulkFlushMaxActions = 
BULK_FLUSH_MAX_ACTIONS.defaultValue();
+        private long bulkFlushIntervalMs = 
BULK_FLUSH_INTERVAL.defaultValue().toMillis();
+        private int maxRetryTimes = SINK_MAX_RETRIES.defaultValue();
+        private DeliveryGuarantee deliveryGuarantee = 
DeliveryGuarantee.AT_LEAST_ONCE;
+        private @Nullable Integer parallelism;
+
+        /**
+         * Sets the maximum number of actions to buffer for each bulk request. 
You can pass -1 to
+         * disable it. The default flush size 1000.
+         *
+         * @param numMaxActions the maximum number of actions to buffer per 
bulk request.
+         * @return this builder
+         */
+        public MongoWriteOptionsBuilder setBulkFlushMaxActions(int 
numMaxActions) {
+            checkState(
+                    numMaxActions == -1 || numMaxActions > 0,
+                    "Max number of buffered actions must be larger than 0.");
+            this.bulkFlushMaxActions = numMaxActions;
+            return this;
+        }
+
+        /**
+         * Sets the bulk flush interval, in milliseconds. You can pass -1 to 
disable it.
+         *
+         * @param intervalMillis the bulk flush interval, in milliseconds.
+         * @return this builder
+         */
+        public MongoWriteOptionsBuilder setBulkFlushIntervalMs(long 
intervalMillis) {
+            checkState(
+                    intervalMillis == -1 || intervalMillis >= 0,
+                    "Interval (in milliseconds) between each flush must be 
larger than "
+                            + "or equal to 0.");
+            this.bulkFlushIntervalMs = intervalMillis;
+            return this;
+        }
+
+        /**
+         * Sets the max retry times if writing records failed.
+         *
+         * @param maxRetryTimes the max retry times.
+         * @return this builder
+         */
+        public MongoWriteOptionsBuilder setMaxRetryTimes(int maxRetryTimes) {
+            checkArgument(
+                    maxRetryTimes >= 0, "The max retry times must be larger 
than or equal to 0.");
+            this.maxRetryTimes = maxRetryTimes;
+            return this;
+        }
+
+        /**
+         * Sets the wanted {@link DeliveryGuarantee}. The default delivery 
guarantee is {@link
+         * DeliveryGuarantee#NONE}
+         *
+         * @param deliveryGuarantee which describes the record emission 
behaviour
+         * @return this builder
+         */
+        public MongoWriteOptionsBuilder setDeliveryGuarantee(DeliveryGuarantee 
deliveryGuarantee) {
+            checkState(
+                    deliveryGuarantee != DeliveryGuarantee.EXACTLY_ONCE,
+                    "Mongo sink does not support the EXACTLY_ONCE guarantee.");
+            this.deliveryGuarantee = checkNotNull(deliveryGuarantee);
+            return this;
+        }
+
+        /**
+         * Sets the write parallelism.
+         *
+         * @param parallelism the write parallelism
+         * @return this builder
+         */
+        public MongoWriteOptionsBuilder setParallelism(Integer parallelism) {
+            checkArgument(
+                    parallelism == null || parallelism > 0,
+                    "Mongo sink parallelism must be larger than 0.");
+            this.parallelism = parallelism;
+            return this;
+        }

Review Comment:
   ```suggestion
           public MongoWriteOptionsBuilder setParallelism(int parallelism) {
               checkArgument(parallelism > 0,
                       "Mongo sink parallelism must be larger than 0.");
               this.parallelism = parallelism;
               return this;
           }
   ```
   use an int



##########
flink-connector-mongodb/src/main/java/org/apache/flink/connector/mongodb/sink/writer/serializer/MongoSerializationSchema.java:
##########
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.mongodb.sink.writer.serializer;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.api.common.serialization.SerializationSchema;
+import org.apache.flink.connector.mongodb.sink.config.MongoWriteOptions;
+import org.apache.flink.connector.mongodb.sink.writer.context.MongoSinkContext;
+
+import com.mongodb.client.model.WriteModel;
+import org.bson.BsonDocument;
+
+import java.io.Serializable;
+
+/**
+ * The serialization schema for how to serialize records into MongoDB.
+ *
+ * @param <IN> The message type send to MongoDB.
+ */
+@PublicEvolving
+public interface MongoSerializationSchema<IN> extends Serializable {
+
+    /**
+     * Initialization method for the schema. It is called before the actual 
working methods {@link
+     * #serialize(Object, MongoSinkContext)} and thus suitable for one-time 
setup work.
+     *
+     * <p>The provided {@link SerializationSchema.InitializationContext} can 
be used to access
+     * additional features such as registering user metrics.
+     *
+     * @param initializationContext Contextual information that can be used 
during initialization.
+     * @param sinkContext Runtime information i.e. partitions, subtaskId.
+     * @param sinkConfiguration All the configure options for the MongoDB 
sink. You can add custom
+     *     options.
+     */
+    default void open(
+            SerializationSchema.InitializationContext initializationContext,
+            MongoSinkContext sinkContext,
+            MongoWriteOptions sinkConfiguration)

Review Comment:
   What are the use-cases for passing the sinkConfiguration?



##########
flink-connector-mongodb/pom.xml:
##########
@@ -0,0 +1,165 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+  http://www.apache.org/licenses/LICENSE-2.0
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+<project
+       xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd";
+       xmlns="http://maven.apache.org/POM/4.0.0"; 
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";>
+
+       <modelVersion>4.0.0</modelVersion>
+
+       <parent>
+               <artifactId>flink-connector-mongodb-parent</artifactId>
+               <groupId>org.apache.flink</groupId>
+               <version>1.0.0-SNAPSHOT</version>
+               <relativePath>../pom.xml</relativePath>
+       </parent>
+
+       <artifactId>flink-connector-mongodb</artifactId>
+       <name>Flink : Connectors : MongoDB</name>
+
+       <packaging>jar</packaging>
+
+       <dependencies>
+               <!-- Core -->
+
+               <dependency>
+                       <groupId>org.apache.flink</groupId>
+                       <artifactId>flink-connector-base</artifactId>
+                       <version>${flink.version}</version>
+               </dependency>
+
+               <dependency>
+                       <groupId>org.apache.flink</groupId>
+                       <artifactId>flink-streaming-java</artifactId>
+                       <version>${flink.version}</version>
+                       <scope>provided</scope>
+               </dependency>
+
+               <!-- MongoDB -->
+
+               <dependency>
+                       <groupId>org.mongodb</groupId>
+                       <artifactId>mongodb-driver-sync</artifactId>
+               </dependency>
+
+               <!-- Table ecosystem -->
+
+               <!-- Projects depending on this project won't depend on 
flink-table-*. -->
+               <dependency>
+                       <groupId>org.apache.flink</groupId>
+                       <artifactId>flink-table-api-java-bridge</artifactId>
+                       <version>${flink.version}</version>
+                       <scope>provided</scope>
+                       <optional>true</optional>
+               </dependency>
+
+               <!-- Tests -->
+
+               <dependency>
+                       <groupId>org.testcontainers</groupId>
+                       <artifactId>mongodb</artifactId>
+                       <scope>test</scope>
+               </dependency>
+
+               <dependency>
+                       <groupId>org.apache.flink</groupId>
+                       <artifactId>flink-test-utils</artifactId>
+                       <version>${flink.version}</version>
+                       <scope>test</scope>
+               </dependency>
+
+               <dependency>
+                       <groupId>org.apache.flink</groupId>
+                       <artifactId>flink-connector-test-utils</artifactId>
+                       <version>${flink.version}</version>
+                       <scope>test</scope>
+               </dependency>
+
+               <dependency>
+                       <groupId>org.apache.flink</groupId>
+                       <artifactId>flink-runtime</artifactId>
+                       <version>${flink.version}</version>
+                       <type>test-jar</type>
+                       <scope>test</scope>
+               </dependency>
+
+               <dependency>
+                       <groupId>org.apache.flink</groupId>
+                       <artifactId>flink-streaming-java</artifactId>
+                       <version>${flink.version}</version>
+                       <type>test-jar</type>
+                       <scope>test</scope>
+               </dependency>
+
+               <!-- Table API integration tests -->
+               <dependency>
+                       <groupId>org.apache.flink</groupId>
+                       <artifactId>flink-table-planner-loader</artifactId>
+                       <version>${flink.version}</version>
+                       <scope>test</scope>
+               </dependency>
+
+               <dependency>
+                       <groupId>org.apache.flink</groupId>
+                       <artifactId>flink-table-runtime</artifactId>
+                       <version>${flink.version}</version>
+                       <scope>test</scope>
+               </dependency>
+
+               <dependency>
+                       <groupId>org.apache.flink</groupId>
+                       <artifactId>flink-table-common</artifactId>
+                       <version>${flink.version}</version>
+                       <type>test-jar</type>
+                       <scope>test</scope>
+               </dependency>
+
+               <dependency>
+                       <groupId>org.apache.flink</groupId>
+                       <artifactId>flink-connector-base</artifactId>
+                       <version>${flink.version}</version>
+                       <type>test-jar</type>
+                       <scope>test</scope>
+               </dependency>
+
+               <!-- ArchUit test dependencies -->
+
+               <dependency>
+                       <groupId>org.apache.flink</groupId>
+                       <artifactId>flink-architecture-tests-test</artifactId>
+                       <version>${flink.version}</version>
+                       <scope>test</scope>
+               </dependency>
+       </dependencies>
+
+       <build>
+               <plugins>
+                       <plugin>
+                               <groupId>org.apache.maven.plugins</groupId>
+                               <artifactId>maven-jar-plugin</artifactId>
+                               <executions>
+                                       <execution>
+                                               <goals>
+                                                       <goal>test-jar</goal>
+                                               </goals>
+                                       </execution>
+                               </executions>
+                       </plugin>
+               </plugins>
+       </build>

Review Comment:
   who is the intended consumer for the test-jar, and for what purpose?



##########
flink-connector-mongodb/src/main/java/org/apache/flink/connector/mongodb/sink/writer/MongoWriter.java:
##########
@@ -0,0 +1,183 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.mongodb.sink.writer;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.common.functions.util.ListCollector;
+import org.apache.flink.api.common.operators.MailboxExecutor;
+import org.apache.flink.api.common.serialization.SerializationSchema;
+import org.apache.flink.api.connector.sink2.Sink;
+import org.apache.flink.api.connector.sink2.SinkWriter;
+import org.apache.flink.connector.mongodb.common.config.MongoConnectionOptions;
+import org.apache.flink.connector.mongodb.sink.config.MongoWriteOptions;
+import org.apache.flink.connector.mongodb.sink.writer.context.MongoSinkContext;
+import 
org.apache.flink.connector.mongodb.sink.writer.context.MongoSinkContextImpl;
+import 
org.apache.flink.connector.mongodb.sink.writer.serializer.MongoSerializationSchema;
+import org.apache.flink.metrics.groups.SinkWriterMetricGroup;
+import org.apache.flink.runtime.operators.util.metrics.CountingCollector;
+import org.apache.flink.util.Collector;
+import org.apache.flink.util.FlinkRuntimeException;
+
+import com.mongodb.MongoException;
+import com.mongodb.client.MongoClient;
+import com.mongodb.client.MongoClients;
+import com.mongodb.client.model.WriteModel;
+import org.bson.BsonDocument;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * This class is responsible to write records in a MongoDB collection.
+ *
+ * @param <IN> The type of the input elements.
+ */
+@Internal
+public class MongoWriter<IN> implements SinkWriter<IN> {
+
+    private static final Logger LOG = 
LoggerFactory.getLogger(MongoWriter.class);
+
+    private final MongoConnectionOptions connectionOptions;
+    private final MongoWriteOptions writeOptions;
+    private final MongoSerializationSchema<IN> serializationSchema;
+    private final MongoSinkContext sinkContext;
+    private final MailboxExecutor mailboxExecutor;
+    private final boolean flushOnCheckpoint;
+    private final List<WriteModel<BsonDocument>> bulkRequests = new 
ArrayList<>();
+    private final Collector<WriteModel<BsonDocument>> collector;
+    private final MongoClient mongoClient;
+
+    private boolean checkpointInProgress = false;
+    private volatile long lastSendTime = 0L;
+    private volatile long ackTime = Long.MAX_VALUE;
+
+    public MongoWriter(
+            MongoConnectionOptions connectionOptions,
+            MongoWriteOptions writeOptions,
+            boolean flushOnCheckpoint,
+            Sink.InitContext initContext,
+            MongoSerializationSchema<IN> serializationSchema) {
+        this.connectionOptions = checkNotNull(connectionOptions);
+        this.writeOptions = checkNotNull(writeOptions);
+        this.serializationSchema = checkNotNull(serializationSchema);
+        this.flushOnCheckpoint = flushOnCheckpoint;
+
+        checkNotNull(initContext);
+        this.mailboxExecutor = checkNotNull(initContext.getMailboxExecutor());
+
+        SinkWriterMetricGroup metricGroup = 
checkNotNull(initContext.metricGroup());
+        metricGroup.setCurrentSendTimeGauge(() -> ackTime - lastSendTime);
+
+        this.collector =
+                new CountingCollector<>(
+                        new ListCollector<>(this.bulkRequests),
+                        metricGroup.getNumRecordsSendCounter());
+
+        // Initialize the serialization schema.
+        this.sinkContext = new MongoSinkContextImpl(initContext, writeOptions);
+        try {
+            SerializationSchema.InitializationContext initializationContext =
+                    initContext.asSerializationSchemaInitializationContext();
+            serializationSchema.open(initializationContext, sinkContext, 
writeOptions);
+        } catch (Exception e) {
+            throw new FlinkRuntimeException("Failed to open the MongoEmitter", 
e);
+        }
+
+        // Initialize the mongo client.
+        this.mongoClient = MongoClients.create(connectionOptions.getUri());
+    }
+
+    @Override
+    public void write(IN element, Context context) throws IOException, 
InterruptedException {
+        // do not allow new bulk writes until all actions are flushed
+        while (checkpointInProgress) {
+            mailboxExecutor.yield();
+        }
+        collector.collect(serializationSchema.serialize(element, sinkContext));
+        if (isOverMaxActionsLimit() || isOverMaxFlushIntervalLimit()) {
+            doBulkWrite();
+        }
+    }
+
+    @Override
+    public void flush(boolean endOfInput) throws IOException {
+        checkpointInProgress = true;
+        while (!bulkRequests.isEmpty() && (flushOnCheckpoint || endOfInput)) {
+            doBulkWrite();
+        }
+        checkpointInProgress = false;
+    }
+
+    @Override
+    public void close() {
+        if (mongoClient != null) {
+            mongoClient.close();
+        }
+    }
+
+    @VisibleForTesting
+    void doBulkWrite() throws IOException {
+        if (bulkRequests.isEmpty()) {
+            // no records to write
+            return;
+        }
+
+        int maxRetryTimes = writeOptions.getMaxRetryTimes();
+        for (int i = 0; i <= maxRetryTimes; i++) {
+            try {
+                lastSendTime = System.currentTimeMillis();
+                mongoClient
+                        .getDatabase(connectionOptions.getDatabase())
+                        .getCollection(connectionOptions.getCollection(), 
BsonDocument.class)
+                        .bulkWrite(bulkRequests);
+                ackTime = System.currentTimeMillis();
+                bulkRequests.clear();
+                break;
+            } catch (MongoException e) {
+                LOG.error("Bulk Write to MongoDB failed, retry times = {}", i, 
e);

Review Comment:
   This shouldn't be logged as an error; retries aren't unusual. Only log an 
error if it fails completely.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to