This is an automated email from the ASF dual-hosted git repository. twalthr pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit a3e2ae47d9bbfebbddae01d5a19431cd7d03eee5 Author: Ingo Bürk <ingo.bu...@tngtech.com> AuthorDate: Mon Mar 22 15:50:45 2021 +0100 [FLINK-21822][table] Introduce CatalogFactoryHelper THis closes #15245. --- .../catalog/hive/factories/HiveCatalogFactory.java | 43 +++--------- .../jdbc/catalog/factory/JdbcCatalogFactory.java | 42 +++--------- .../catalog/GenericInMemoryCatalogFactory.java | 14 ++-- .../apache/flink/table/factories/FactoryUtil.java | 76 +++++++++++++++++++++- .../flink/table/factories/FactoryUtilTest.java | 31 ++++++++- 5 files changed, 127 insertions(+), 79 deletions(-) diff --git a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/factories/HiveCatalogFactory.java b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/factories/HiveCatalogFactory.java index c1dce06..0d6b779 100644 --- a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/factories/HiveCatalogFactory.java +++ b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/factories/HiveCatalogFactory.java @@ -19,11 +19,10 @@ package org.apache.flink.table.catalog.hive.factories; import org.apache.flink.configuration.ConfigOption; -import org.apache.flink.configuration.Configuration; -import org.apache.flink.table.api.ValidationException; import org.apache.flink.table.catalog.Catalog; import org.apache.flink.table.catalog.hive.HiveCatalog; import org.apache.flink.table.factories.CatalogFactory; +import org.apache.flink.table.factories.FactoryUtil; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -65,41 +64,15 @@ public class HiveCatalogFactory implements CatalogFactory { @Override public Catalog createCatalog(Context context) { - final Configuration configuration = Configuration.fromMap(context.getOptions()); - validateConfiguration(configuration); + final FactoryUtil.CatalogFactoryHelper helper = + FactoryUtil.createCatalogFactoryHelper(this, context); + helper.validate(); return new HiveCatalog( context.getName(), - configuration.getString(DEFAULT_DATABASE), - configuration.getString(HIVE_CONF_DIR), - configuration.getString(HADOOP_CONF_DIR), - configuration.getString(HIVE_VERSION)); - } - - private void validateConfiguration(Configuration configuration) { - final String defaultDatabase = configuration.getString(DEFAULT_DATABASE); - if (defaultDatabase != null && defaultDatabase.isEmpty()) { - throw new ValidationException( - String.format( - "Option '%s' was provided, but is empty", DEFAULT_DATABASE.key())); - } - - final String hiveConfDir = configuration.getString(HIVE_CONF_DIR); - if (hiveConfDir != null && hiveConfDir.isEmpty()) { - throw new ValidationException( - String.format("Option '%s' was provided, but is empty", HIVE_CONF_DIR.key())); - } - - final String hadoopConfDir = configuration.getString(HADOOP_CONF_DIR); - if (hadoopConfDir != null && hadoopConfDir.isEmpty()) { - throw new ValidationException( - String.format("Option '%s' was provided, but is empty", HADOOP_CONF_DIR.key())); - } - - final String hiveVersion = configuration.getString(HIVE_VERSION); - if (hiveVersion != null && hiveVersion.isEmpty()) { - throw new ValidationException( - String.format("Option '%s' was provided, but is empty", HIVE_VERSION.key())); - } + helper.getOptions().get(DEFAULT_DATABASE), + helper.getOptions().get(HIVE_CONF_DIR), + helper.getOptions().get(HADOOP_CONF_DIR), + helper.getOptions().get(HIVE_VERSION)); } } diff --git a/flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/catalog/factory/JdbcCatalogFactory.java b/flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/catalog/factory/JdbcCatalogFactory.java index 3b361ad..2fb74f2 100644 --- a/flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/catalog/factory/JdbcCatalogFactory.java +++ b/flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/catalog/factory/JdbcCatalogFactory.java @@ -19,11 +19,10 @@ package org.apache.flink.connector.jdbc.catalog.factory; import org.apache.flink.configuration.ConfigOption; -import org.apache.flink.configuration.Configuration; import org.apache.flink.connector.jdbc.catalog.JdbcCatalog; -import org.apache.flink.table.api.ValidationException; import org.apache.flink.table.catalog.Catalog; import org.apache.flink.table.factories.CatalogFactory; +import org.apache.flink.table.factories.FactoryUtil; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -66,40 +65,15 @@ public class JdbcCatalogFactory implements CatalogFactory { @Override public Catalog createCatalog(Context context) { - final Configuration configuration = Configuration.fromMap(context.getOptions()); - validateConfiguration(configuration); + final FactoryUtil.CatalogFactoryHelper helper = + FactoryUtil.createCatalogFactoryHelper(this, context); + helper.validate(); return new JdbcCatalog( context.getName(), - configuration.getString(DEFAULT_DATABASE), - configuration.getString(USERNAME), - configuration.getString(PASSWORD), - configuration.getString(BASE_URL)); - } - - private void validateConfiguration(Configuration configuration) { - final String defaultDatabase = configuration.getString(DEFAULT_DATABASE); - if (defaultDatabase == null || defaultDatabase.isEmpty()) { - throw new ValidationException( - String.format("Missing or empty value for '%s'", DEFAULT_DATABASE.key())); - } - - final String username = configuration.getString(USERNAME); - if (username == null || username.isEmpty()) { - throw new ValidationException( - String.format("Missing or empty value for '%s'", USERNAME.key())); - } - - final String password = configuration.getString(PASSWORD); - if (password == null || password.isEmpty()) { - throw new ValidationException( - String.format("Missing or empty value for '%s'", PASSWORD.key())); - } - - final String baseUrl = configuration.getString(BASE_URL); - if (baseUrl == null || baseUrl.isEmpty()) { - throw new ValidationException( - String.format("Missing or empty value for '%s'", BASE_URL.key())); - } + helper.getOptions().get(DEFAULT_DATABASE), + helper.getOptions().get(USERNAME), + helper.getOptions().get(PASSWORD), + helper.getOptions().get(BASE_URL)); } } diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/GenericInMemoryCatalogFactory.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/GenericInMemoryCatalogFactory.java index 6ee2c14..95c0a02 100644 --- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/GenericInMemoryCatalogFactory.java +++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/GenericInMemoryCatalogFactory.java @@ -19,9 +19,8 @@ package org.apache.flink.table.catalog; import org.apache.flink.configuration.ConfigOption; -import org.apache.flink.configuration.Configuration; -import org.apache.flink.table.api.ValidationException; import org.apache.flink.table.factories.CatalogFactory; +import org.apache.flink.table.factories.FactoryUtil; import java.util.Collections; import java.util.HashSet; @@ -53,12 +52,11 @@ public class GenericInMemoryCatalogFactory implements CatalogFactory { @Override public Catalog createCatalog(Context context) { - final Configuration configuration = Configuration.fromMap(context.getOptions()); - final String defaultDatabase = configuration.getString(DEFAULT_DATABASE); - if (defaultDatabase == null || defaultDatabase.isEmpty()) { - throw new ValidationException("The default database must not be empty"); - } + final FactoryUtil.CatalogFactoryHelper helper = + FactoryUtil.createCatalogFactoryHelper(this, context); + helper.validate(); - return new GenericInMemoryCatalog(context.getName(), defaultDatabase); + return new GenericInMemoryCatalog( + context.getName(), helper.getOptions().get(DEFAULT_DATABASE)); } } diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/factories/FactoryUtil.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/factories/FactoryUtil.java index 413193d..c038e79 100644 --- a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/factories/FactoryUtil.java +++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/factories/FactoryUtil.java @@ -54,6 +54,7 @@ import java.util.ServiceConfigurationError; import java.util.ServiceLoader; import java.util.Set; import java.util.stream.Collectors; +import java.util.stream.Stream; /** Utility for working with {@link Factory}s. */ @PublicEvolving @@ -175,6 +176,16 @@ public final class FactoryUtil { } /** + * Creates a utility that helps validating options for a {@link CatalogFactory}. + * + * <p>Note: This utility checks for left-over options in the final step. + */ + public static CatalogFactoryHelper createCatalogFactoryHelper( + CatalogFactory factory, CatalogFactory.Context context) { + return new CatalogFactoryHelper(factory, context); + } + + /** * Creates a utility that helps in discovering formats and validating all options for a {@link * DynamicTableFactory}. * @@ -376,7 +387,7 @@ public final class FactoryUtil { if (remainingOptionKeys.size() > 0) { throw new ValidationException( String.format( - "Unsupported options found for connector '%s'.\n\n" + "Unsupported options found for '%s'.\n\n" + "Unsupported options:\n\n" + "%s\n\n" + "Supported options:\n\n" @@ -508,6 +519,69 @@ public final class FactoryUtil { // -------------------------------------------------------------------------------------------- /** + * Helper utility for catalog implementations to validate options provided by {@link + * CatalogFactory}. + */ + @PublicEvolving + public static class CatalogFactoryHelper { + + private final CatalogFactory catalogFactory; + private final CatalogFactory.Context context; + private final Configuration configuration; + + private final Set<String> consumedOptionKeys; + + public CatalogFactoryHelper(CatalogFactory catalogFactory, CatalogFactory.Context context) { + this.catalogFactory = catalogFactory; + this.context = context; + this.configuration = Configuration.fromMap(context.getOptions()); + + consumedOptionKeys = new HashSet<>(); + consumedOptionKeys.add(PROPERTY_VERSION.key()); + Stream.concat( + catalogFactory.requiredOptions().stream(), + catalogFactory.optionalOptions().stream()) + .map(ConfigOption::key) + .forEach(consumedOptionKeys::add); + } + + /** + * Validates the options of the {@link CatalogFactory}. It checks for unconsumed option + * keys. + */ + public void validate() { + validateFactoryOptions(catalogFactory, configuration); + validateUnconsumedKeys( + catalogFactory.factoryIdentifier(), configuration.keySet(), consumedOptionKeys); + } + + /** + * Validates the options of the {@link CatalogFactory}. It checks for unconsumed option keys + * while ignoring the options with given prefixes. + * + * <p>The option keys that have given prefix {@code prefixToSkip} would just be skipped for + * validation. + * + * @param prefixesToSkip Set of option key prefixes to skip validation + */ + public void validateExcept(String... prefixesToSkip) { + Preconditions.checkArgument( + prefixesToSkip.length > 0, "Prefixes to skip can not be empty."); + final List<String> prefixesList = Arrays.asList(prefixesToSkip); + consumedOptionKeys.addAll( + configuration.keySet().stream() + .filter(key -> prefixesList.stream().anyMatch(key::startsWith)) + .collect(Collectors.toSet())); + validate(); + } + + /** Returns all options of the catalog. */ + public ReadableConfig getOptions() { + return configuration; + } + } + + /** * Helper utility for discovering formats and validating all options for a {@link * DynamicTableFactory}. * diff --git a/flink-table/flink-table-common/src/test/java/org/apache/flink/table/factories/FactoryUtilTest.java b/flink-table/flink-table-common/src/test/java/org/apache/flink/table/factories/FactoryUtilTest.java index 7b04ea0..8d523f2 100644 --- a/flink-table/flink-table-common/src/test/java/org/apache/flink/table/factories/FactoryUtilTest.java +++ b/flink-table/flink-table-common/src/test/java/org/apache/flink/table/factories/FactoryUtilTest.java @@ -43,6 +43,7 @@ import java.util.Optional; import java.util.function.Consumer; import static org.apache.flink.core.testutils.FlinkMatchers.containsCause; +import static org.apache.flink.core.testutils.FlinkMatchers.containsMessage; import static org.apache.flink.table.factories.utils.FactoryMocks.SCHEMA; import static org.apache.flink.table.factories.utils.FactoryMocks.createTableSink; import static org.apache.flink.table.factories.utils.FactoryMocks.createTableSource; @@ -142,7 +143,7 @@ public class FactoryUtilTest { @Test public void testUnconsumedOption() { expectError( - "Unsupported options found for connector 'test-connector'.\n\n" + "Unsupported options found for 'test-connector'.\n\n" + "Unsupported options:\n\n" + "this-is-also-not-consumed\n" + "this-is-not-consumed\n\n" @@ -297,6 +298,34 @@ public class FactoryUtilTest { "my-database"); } + @Test + public void testCatalogFactoryHelper() { + final FactoryUtil.CatalogFactoryHelper helper1 = + FactoryUtil.createCatalogFactoryHelper( + new TestCatalogFactory(), + new FactoryUtil.DefaultCatalogContext( + "test", + Collections.emptyMap(), + null, + Thread.currentThread().getContextClassLoader())); + + // No error + helper1.validate(); + + final FactoryUtil.CatalogFactoryHelper helper2 = + FactoryUtil.createCatalogFactoryHelper( + new TestCatalogFactory(), + new FactoryUtil.DefaultCatalogContext( + "test", + Collections.singletonMap("x", "y"), + null, + Thread.currentThread().getContextClassLoader())); + + thrown.expect(ValidationException.class); + thrown.expect(containsMessage("Unsupported options found for 'test-catalog'")); + helper2.validate(); + } + // -------------------------------------------------------------------------------------------- private void expectError(String message) {