MonsterChenzhuo commented on a change in pull request #17847:
URL: https://github.com/apache/flink/pull/17847#discussion_r754064927



##########
File path: 
flink-connectors/flink-connector-mongodb/src/main/java/org/apache/flink/mongodb/table/MongodbDynamicTableSourceSinkFactory.java
##########
@@ -0,0 +1,133 @@
+package org.apache.flink.mongodb.table;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.common.serialization.DeserializationSchema;
+import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.configuration.ConfigOptions;
+import org.apache.flink.mongodb.table.sink.MongodbDynamicTableSink;
+import org.apache.flink.mongodb.table.sink.MongodbSinkConf;
+import org.apache.flink.mongodb.table.source.MongodbDynamicTableSource;
+import org.apache.flink.mongodb.table.util.ContextUtil;
+import org.apache.flink.table.api.TableSchema;
+import org.apache.flink.table.connector.format.DecodingFormat;
+import org.apache.flink.table.connector.sink.DynamicTableSink;
+import org.apache.flink.table.connector.source.DynamicTableSource;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.factories.DeserializationFormatFactory;
+import org.apache.flink.table.factories.DynamicTableSinkFactory;
+import org.apache.flink.table.factories.DynamicTableSourceFactory;
+import org.apache.flink.table.factories.FactoryUtil;
+import org.apache.flink.table.utils.TableSchemaUtils;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.HashSet;
+import java.util.Set;
+
+public class MongodbDynamicTableSourceSinkFactory implements 
DynamicTableSinkFactory, DynamicTableSourceFactory {
+
+    private static final Logger LOG = 
LoggerFactory.getLogger(MongodbDynamicTableSourceSinkFactory.class);
+    @VisibleForTesting
+    public static final String IDENTIFIER = "mongodb";
+    public static final ConfigOption<String> DATABASE = 
ConfigOptions.key("database".toLowerCase())

Review comment:
       > 不支持upsert么
   
   @dailai The connector supports two modes, "insert" and "merge into". Merge 
into is similar to what you call update. It is used when the primary key is 
unique, there is update for the primary key, and there is no insert for the 
primary key




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to