MonsterChenzhuo commented on a change in pull request #17847: URL: https://github.com/apache/flink/pull/17847#discussion_r754064927
########## File path: flink-connectors/flink-connector-mongodb/src/main/java/org/apache/flink/mongodb/table/MongodbDynamicTableSourceSinkFactory.java ########## @@ -0,0 +1,133 @@ +package org.apache.flink.mongodb.table; + +import org.apache.flink.annotation.VisibleForTesting; +import org.apache.flink.api.common.serialization.DeserializationSchema; +import org.apache.flink.configuration.ConfigOption; +import org.apache.flink.configuration.ConfigOptions; +import org.apache.flink.mongodb.table.sink.MongodbDynamicTableSink; +import org.apache.flink.mongodb.table.sink.MongodbSinkConf; +import org.apache.flink.mongodb.table.source.MongodbDynamicTableSource; +import org.apache.flink.mongodb.table.util.ContextUtil; +import org.apache.flink.table.api.TableSchema; +import org.apache.flink.table.connector.format.DecodingFormat; +import org.apache.flink.table.connector.sink.DynamicTableSink; +import org.apache.flink.table.connector.source.DynamicTableSource; +import org.apache.flink.table.data.RowData; +import org.apache.flink.table.factories.DeserializationFormatFactory; +import org.apache.flink.table.factories.DynamicTableSinkFactory; +import org.apache.flink.table.factories.DynamicTableSourceFactory; +import org.apache.flink.table.factories.FactoryUtil; +import org.apache.flink.table.utils.TableSchemaUtils; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.HashSet; +import java.util.Set; + +public class MongodbDynamicTableSourceSinkFactory implements DynamicTableSinkFactory, DynamicTableSourceFactory { + + private static final Logger LOG = LoggerFactory.getLogger(MongodbDynamicTableSourceSinkFactory.class); + @VisibleForTesting + public static final String IDENTIFIER = "mongodb"; + public static final ConfigOption<String> DATABASE = ConfigOptions.key("database".toLowerCase()) Review comment: > 不支持upsert么 @dailai The connector supports two modes, "insert" and "merge into". Merge into is similar to what you call update. It is used when the primary key is unique, there is update for the primary key, and there is no insert for the primary key -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org