twalthr commented on a change in pull request #11959:
URL: https://github.com/apache/flink/pull/11959#discussion_r419928465



##########
File path: 
flink-table/flink-table-common/src/main/java/org/apache/flink/table/factories/FactoryUtil.java
##########
@@ -0,0 +1,570 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.factories;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.configuration.ConfigOptions;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.DelegatingConfiguration;
+import org.apache.flink.configuration.ReadableConfig;
+import org.apache.flink.table.api.TableException;
+import org.apache.flink.table.api.ValidationException;
+import org.apache.flink.table.catalog.Catalog;
+import org.apache.flink.table.catalog.CatalogTable;
+import org.apache.flink.table.catalog.ObjectIdentifier;
+import org.apache.flink.table.connector.format.ScanFormat;
+import org.apache.flink.table.connector.format.SinkFormat;
+import org.apache.flink.table.connector.sink.DynamicTableSink;
+import org.apache.flink.table.connector.source.DynamicTableSource;
+import org.apache.flink.table.utils.EncodingUtils;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+
+import java.util.HashSet;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.ServiceConfigurationError;
+import java.util.ServiceLoader;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+/**
+ * Utility for working with {@link Factory}s.
+ */
+@Internal
+public final class FactoryUtil {
+
+       private static final Logger LOG = 
LoggerFactory.getLogger(FactoryUtil.class);
+
+       public static final ConfigOption<Integer> PROPERTY_VERSION = 
ConfigOptions.key("property-version")
+               .intType()
+               .defaultValue(1)
+               .withDescription(
+                       "Version of the overall property design. This option is 
meant for future backwards compatibility.");
+
+       public static final ConfigOption<String> CONNECTOR = 
ConfigOptions.key("connector")
+               .stringType()
+               .noDefaultValue()
+               .withDescription(
+                       "Uniquely identifies the connector of a dynamic table 
that is used for accessing data in " +
+                       "an external system. Its value is used during table 
source and table sink discovery.");
+
+       public static final String FORMAT_PREFIX = "format.";
+
+       public static final String KEY_FORMAT_PREFIX = "key.format.";
+
+       public static final String VALUE_FORMAT_PREFIX = "value.format.";
+
+       /**
+        * Creates a {@link DynamicTableSource} from a {@link CatalogTable}.
+        *
+        * <p>It considers {@link Catalog#getFactory()} if provided.
+        */
+       public static DynamicTableSource createTableSource(
+                       @Nullable Catalog catalog,
+                       ObjectIdentifier objectIdentifier,
+                       CatalogTable catalogTable,
+                       ReadableConfig configuration,
+                       ClassLoader classLoader) {
+               final DefaultDynamicTableContext context = new 
DefaultDynamicTableContext(
+                       objectIdentifier,
+                       catalogTable,
+                       configuration,
+                       classLoader);
+               try {
+                       final DynamicTableSourceFactory factory = 
getDynamicTableFactory(
+                               DynamicTableSourceFactory.class,
+                               catalog,
+                               context);
+                       return factory.createDynamicTableSource(context);
+               } catch (Throwable t) {
+                       throw new ValidationException(
+                               String.format(
+                                       "Unable to create a source for reading 
table '%s'.\n\n" +
+                                       "Table options are:\n\n" +
+                                       "%s",
+                                       objectIdentifier.asSummaryString(),
+                                       catalogTable.getOptions()
+                                               .entrySet()
+                                               .stream()
+                                               .map(e -> 
stringifyOption(e.getKey(), e.getValue()))
+                                               .sorted()
+                                               
.collect(Collectors.joining("\n"))),
+                               t);
+               }
+       }
+
+       /**
+        * Creates a {@link DynamicTableSink} from a {@link CatalogTable}.
+        *
+        * <p>It considers {@link Catalog#getFactory()} if provided.
+        */
+       public static DynamicTableSink createTableSink(
+                       @Nullable Catalog catalog,
+                       ObjectIdentifier objectIdentifier,
+                       CatalogTable catalogTable,
+                       ReadableConfig configuration,
+                       ClassLoader classLoader) {
+               final DefaultDynamicTableContext context = new 
DefaultDynamicTableContext(
+                       objectIdentifier,
+                       catalogTable,
+                       configuration,
+                       classLoader);
+               try {
+                       final DynamicTableSinkFactory factory = 
getDynamicTableFactory(
+                               DynamicTableSinkFactory.class,
+                               catalog,
+                               context);
+                       return factory.createDynamicTableSink(context);
+               } catch (Throwable t) {
+                       throw new ValidationException(
+                               String.format(
+                                       "Unable to create a sink for writing 
table '%s'.\n\n" +
+                                       "Table options are:\n\n" +
+                                       "%s",
+                                       objectIdentifier.asSummaryString(),
+                                       catalogTable.getOptions()
+                                               .entrySet()
+                                               .stream()
+                                               .map(e -> 
stringifyOption(e.getKey(), e.getValue()))
+                                               .sorted()
+                                               
.collect(Collectors.joining("\n"))),
+                               t);
+               }
+       }
+
+       /**
+        * Creates a utility that helps in discovering formats and validating 
all options for a {@link DynamicTableFactory}.
+        *
+        * <p>The following example sketches the usage:
+        * <pre>{@code
+        * // in createDynamicTableSource()
+        * helper = FactoryUtil.createTableFactoryHelper(this, context);
+        * keyFormat = helper.discoverScanFormat(classloader, 
MyFormatFactory.class, KEY_OPTION, "prefix");
+        * valueFormat = helper.discoverScanFormat(classloader, 
MyFormatFactory.class, VALUE_OPTION, "prefix");
+        * helper.validate();
+        * ... // construct connector with discovered formats
+        * }</pre>
+        *
+        * <p>Note: This utility checks for left-over options in the final step.
+        */
+       public static TableFactoryHelper createTableFactoryHelper(

Review comment:
       The format is logic is a bit different from the top-level factory. It 
cannot check unconsumed option keys, but only has a read-only view for which it 
can access its own options. Otherwise we need to change `ReadableConfig` for 
listing all options and maybe also `DelegatingConfiguration`. I think 
functionality-wise it doesn't make a difference. It is sufficient that only the 
top-level checks for unconsumed options considering all nested factories.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to