Airblader commented on a change in pull request #17847:
URL: https://github.com/apache/flink/pull/17847#discussion_r753988501



##########
File path: 
flink-connectors/flink-connector-mongodb/src/main/java/org/apache/flink/mongodb/table/MongodbDynamicTableSourceSinkFactory.java
##########
@@ -0,0 +1,133 @@
+package org.apache.flink.mongodb.table;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.common.serialization.DeserializationSchema;
+import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.configuration.ConfigOptions;
+import org.apache.flink.mongodb.table.sink.MongodbDynamicTableSink;
+import org.apache.flink.mongodb.table.sink.MongodbSinkConf;
+import org.apache.flink.mongodb.table.source.MongodbDynamicTableSource;
+import org.apache.flink.mongodb.table.util.ContextUtil;
+import org.apache.flink.table.api.TableSchema;
+import org.apache.flink.table.connector.format.DecodingFormat;
+import org.apache.flink.table.connector.sink.DynamicTableSink;
+import org.apache.flink.table.connector.source.DynamicTableSource;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.factories.DeserializationFormatFactory;
+import org.apache.flink.table.factories.DynamicTableSinkFactory;
+import org.apache.flink.table.factories.DynamicTableSourceFactory;
+import org.apache.flink.table.factories.FactoryUtil;
+import org.apache.flink.table.utils.TableSchemaUtils;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.HashSet;
+import java.util.Set;
+
+public class MongodbDynamicTableSourceSinkFactory implements 
DynamicTableSinkFactory, DynamicTableSourceFactory {

Review comment:
       Please add `@Internal` here

##########
File path: 
flink-connectors/flink-connector-mongodb/src/main/java/org/apache/flink/mongodb/table/MongodbDynamicTableSourceSinkFactory.java
##########
@@ -0,0 +1,133 @@
+package org.apache.flink.mongodb.table;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.common.serialization.DeserializationSchema;
+import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.configuration.ConfigOptions;
+import org.apache.flink.mongodb.table.sink.MongodbDynamicTableSink;
+import org.apache.flink.mongodb.table.sink.MongodbSinkConf;
+import org.apache.flink.mongodb.table.source.MongodbDynamicTableSource;
+import org.apache.flink.mongodb.table.util.ContextUtil;
+import org.apache.flink.table.api.TableSchema;
+import org.apache.flink.table.connector.format.DecodingFormat;
+import org.apache.flink.table.connector.sink.DynamicTableSink;
+import org.apache.flink.table.connector.source.DynamicTableSource;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.factories.DeserializationFormatFactory;
+import org.apache.flink.table.factories.DynamicTableSinkFactory;
+import org.apache.flink.table.factories.DynamicTableSourceFactory;
+import org.apache.flink.table.factories.FactoryUtil;
+import org.apache.flink.table.utils.TableSchemaUtils;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.HashSet;
+import java.util.Set;
+
+public class MongodbDynamicTableSourceSinkFactory implements 
DynamicTableSinkFactory, DynamicTableSourceFactory {
+
+    private static final Logger LOG = 
LoggerFactory.getLogger(MongodbDynamicTableSourceSinkFactory.class);
+    @VisibleForTesting
+    public static final String IDENTIFIER = "mongodb";
+    public static final ConfigOption<String> DATABASE = 
ConfigOptions.key("database".toLowerCase())

Review comment:
       Please move the config options and the factory identifier to a 
`*ConnectorOptions` class (refer to e.g. `KafkaConnectorOptions`) and mark that 
class `@PublicEvolving`. Also, that class and the entire connector should live 
in a `o.a.f.connector.mongodb` package for consistency, and the class with the 
options in `o.a.f.connector.mongodb.table`.

##########
File path: 
flink-connectors/flink-connector-mongodb/src/main/java/org/apache/flink/mongodb/table/MongodbDynamicTableSourceSinkFactory.java
##########
@@ -0,0 +1,133 @@
+package org.apache.flink.mongodb.table;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.common.serialization.DeserializationSchema;
+import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.configuration.ConfigOptions;
+import org.apache.flink.mongodb.table.sink.MongodbDynamicTableSink;
+import org.apache.flink.mongodb.table.sink.MongodbSinkConf;
+import org.apache.flink.mongodb.table.source.MongodbDynamicTableSource;
+import org.apache.flink.mongodb.table.util.ContextUtil;
+import org.apache.flink.table.api.TableSchema;
+import org.apache.flink.table.connector.format.DecodingFormat;
+import org.apache.flink.table.connector.sink.DynamicTableSink;
+import org.apache.flink.table.connector.source.DynamicTableSource;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.factories.DeserializationFormatFactory;
+import org.apache.flink.table.factories.DynamicTableSinkFactory;
+import org.apache.flink.table.factories.DynamicTableSourceFactory;
+import org.apache.flink.table.factories.FactoryUtil;
+import org.apache.flink.table.utils.TableSchemaUtils;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.HashSet;
+import java.util.Set;
+
+public class MongodbDynamicTableSourceSinkFactory implements 
DynamicTableSinkFactory, DynamicTableSourceFactory {
+
+    private static final Logger LOG = 
LoggerFactory.getLogger(MongodbDynamicTableSourceSinkFactory.class);
+    @VisibleForTesting
+    public static final String IDENTIFIER = "mongodb";
+    public static final ConfigOption<String> DATABASE = 
ConfigOptions.key("database".toLowerCase())
+            .stringType()
+            .noDefaultValue()
+            .withDescription("The data base to connect.");
+    public static final ConfigOption<String> URI = 
ConfigOptions.key("uri".toLowerCase())
+            .stringType()
+            .noDefaultValue()
+            .withDescription("The uri to connect.");
+    public static final ConfigOption<String> COLLECTION_NAME = ConfigOptions
+            .key("collection".toLowerCase())
+            .stringType()
+            .noDefaultValue()
+            .withDescription("The name of the collection to return.");
+    public static final ConfigOption<Integer> MAX_CONNECTION_IDLE_TIME = 
ConfigOptions
+            .key("maxConnectionIdleTime".toLowerCase())
+            .intType()
+            .defaultValue(Integer.valueOf(60000))
+            .withDescription("The maximum idle time for a pooled connection.");
+    public static final ConfigOption<Integer> BATCH_SIZE = ConfigOptions
+            .key("batchSize".toLowerCase())
+            .intType()
+            .defaultValue(Integer.valueOf(1024))
+            .withDescription("The batch size when table invoking.");
+
+    public static final ConfigOption<String> FORMAT =
+            ConfigOptions.key("format")
+                    .stringType()
+                    .defaultValue("json")
+                    .withDescription(
+                            "Defines the format identifier for encoding data. "
+                                    + "The identifier is used to discover a 
suitable format factory.");
+
+    @Override
+    public DynamicTableSink createDynamicTableSink(Context context) {
+        ContextUtil.transformContext(this, context);
+        FactoryUtil.TableFactoryHelper helper = 
FactoryUtil.createTableFactoryHelper(this, context);
+        helper.validate();
+
+        MongodbSinkConf mongodbSinkConf = new MongodbSinkConf(
+                (String) helper
+                        .getOptions()
+                        .get(DATABASE),
+                (String) helper.getOptions().get(COLLECTION_NAME),
+                (String) helper.getOptions().get(URI),
+                ((Integer) 
helper.getOptions().get(MAX_CONNECTION_IDLE_TIME)).intValue(),
+                ((Integer) helper.getOptions().get(BATCH_SIZE)).intValue());
+
+        TableSchema physicalSchema = TableSchemaUtils.getPhysicalSchema(context
+                .getCatalogTable()
+                .getSchema());
+        LOG.info("Create dynamic mongoDB table table: {}.", mongodbSinkConf);
+        return new MongodbDynamicTableSink(mongodbSinkConf, physicalSchema);
+    }
+
+    @Override
+    public String factoryIdentifier() {
+        return IDENTIFIER;
+    }
+
+    @Override
+    public Set<ConfigOption<?>> requiredOptions() {
+        Set<ConfigOption<?>> requiredOptions = new HashSet();
+        requiredOptions.add(DATABASE);
+        requiredOptions.add(COLLECTION_NAME);
+        requiredOptions.add(URI);
+        return requiredOptions;
+    }
+
+    @Override
+    public Set<ConfigOption<?>> optionalOptions() {
+        Set<ConfigOption<?>> optionals = new HashSet();
+        optionals.add(MAX_CONNECTION_IDLE_TIME);
+        optionals.add(BATCH_SIZE);
+        return optionals;
+    }
+
+    @Override
+    public DynamicTableSource createDynamicTableSource(Context context) {
+        ContextUtil.transformContext(this, context);
+        FactoryUtil.TableFactoryHelper helper = 
FactoryUtil.createTableFactoryHelper(this, context);
+        MongodbSinkConf mongodbSinkConf = new MongodbSinkConf(
+                (String) helper
+                        .getOptions()
+                        .get(DATABASE),
+                (String) helper.getOptions().get(COLLECTION_NAME),
+                (String) helper.getOptions().get(URI),
+                ((Integer) 
helper.getOptions().get(MAX_CONNECTION_IDLE_TIME)).intValue(),
+                ((Integer) helper.getOptions().get(BATCH_SIZE)).intValue());
+
+        TableSchema physicalSchema = TableSchemaUtils.getPhysicalSchema(context

Review comment:
       Please migrate this from the deprecated `TableSchema` stack to the new 
`Schema` stack. In fact, here we don't want to pass the schema along at all, 
but rather only the data type for it. You can access that through 
`context.getPhysicalRowDataType()`.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to