This is an automated email from the ASF dual-hosted git repository.

twalthr pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit a3e2ae47d9bbfebbddae01d5a19431cd7d03eee5
Author: Ingo Bürk <ingo.bu...@tngtech.com>
AuthorDate: Mon Mar 22 15:50:45 2021 +0100

    [FLINK-21822][table] Introduce CatalogFactoryHelper
    
    THis closes #15245.
---
 .../catalog/hive/factories/HiveCatalogFactory.java | 43 +++---------
 .../jdbc/catalog/factory/JdbcCatalogFactory.java   | 42 +++---------
 .../catalog/GenericInMemoryCatalogFactory.java     | 14 ++--
 .../apache/flink/table/factories/FactoryUtil.java  | 76 +++++++++++++++++++++-
 .../flink/table/factories/FactoryUtilTest.java     | 31 ++++++++-
 5 files changed, 127 insertions(+), 79 deletions(-)

diff --git 
a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/factories/HiveCatalogFactory.java
 
b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/factories/HiveCatalogFactory.java
index c1dce06..0d6b779 100644
--- 
a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/factories/HiveCatalogFactory.java
+++ 
b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/factories/HiveCatalogFactory.java
@@ -19,11 +19,10 @@
 package org.apache.flink.table.catalog.hive.factories;
 
 import org.apache.flink.configuration.ConfigOption;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.table.api.ValidationException;
 import org.apache.flink.table.catalog.Catalog;
 import org.apache.flink.table.catalog.hive.HiveCatalog;
 import org.apache.flink.table.factories.CatalogFactory;
+import org.apache.flink.table.factories.FactoryUtil;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -65,41 +64,15 @@ public class HiveCatalogFactory implements CatalogFactory {
 
     @Override
     public Catalog createCatalog(Context context) {
-        final Configuration configuration = 
Configuration.fromMap(context.getOptions());
-        validateConfiguration(configuration);
+        final FactoryUtil.CatalogFactoryHelper helper =
+                FactoryUtil.createCatalogFactoryHelper(this, context);
+        helper.validate();
 
         return new HiveCatalog(
                 context.getName(),
-                configuration.getString(DEFAULT_DATABASE),
-                configuration.getString(HIVE_CONF_DIR),
-                configuration.getString(HADOOP_CONF_DIR),
-                configuration.getString(HIVE_VERSION));
-    }
-
-    private void validateConfiguration(Configuration configuration) {
-        final String defaultDatabase = 
configuration.getString(DEFAULT_DATABASE);
-        if (defaultDatabase != null && defaultDatabase.isEmpty()) {
-            throw new ValidationException(
-                    String.format(
-                            "Option '%s' was provided, but is empty", 
DEFAULT_DATABASE.key()));
-        }
-
-        final String hiveConfDir = configuration.getString(HIVE_CONF_DIR);
-        if (hiveConfDir != null && hiveConfDir.isEmpty()) {
-            throw new ValidationException(
-                    String.format("Option '%s' was provided, but is empty", 
HIVE_CONF_DIR.key()));
-        }
-
-        final String hadoopConfDir = configuration.getString(HADOOP_CONF_DIR);
-        if (hadoopConfDir != null && hadoopConfDir.isEmpty()) {
-            throw new ValidationException(
-                    String.format("Option '%s' was provided, but is empty", 
HADOOP_CONF_DIR.key()));
-        }
-
-        final String hiveVersion = configuration.getString(HIVE_VERSION);
-        if (hiveVersion != null && hiveVersion.isEmpty()) {
-            throw new ValidationException(
-                    String.format("Option '%s' was provided, but is empty", 
HIVE_VERSION.key()));
-        }
+                helper.getOptions().get(DEFAULT_DATABASE),
+                helper.getOptions().get(HIVE_CONF_DIR),
+                helper.getOptions().get(HADOOP_CONF_DIR),
+                helper.getOptions().get(HIVE_VERSION));
     }
 }
diff --git 
a/flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/catalog/factory/JdbcCatalogFactory.java
 
b/flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/catalog/factory/JdbcCatalogFactory.java
index 3b361ad..2fb74f2 100644
--- 
a/flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/catalog/factory/JdbcCatalogFactory.java
+++ 
b/flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/catalog/factory/JdbcCatalogFactory.java
@@ -19,11 +19,10 @@
 package org.apache.flink.connector.jdbc.catalog.factory;
 
 import org.apache.flink.configuration.ConfigOption;
-import org.apache.flink.configuration.Configuration;
 import org.apache.flink.connector.jdbc.catalog.JdbcCatalog;
-import org.apache.flink.table.api.ValidationException;
 import org.apache.flink.table.catalog.Catalog;
 import org.apache.flink.table.factories.CatalogFactory;
+import org.apache.flink.table.factories.FactoryUtil;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -66,40 +65,15 @@ public class JdbcCatalogFactory implements CatalogFactory {
 
     @Override
     public Catalog createCatalog(Context context) {
-        final Configuration configuration = 
Configuration.fromMap(context.getOptions());
-        validateConfiguration(configuration);
+        final FactoryUtil.CatalogFactoryHelper helper =
+                FactoryUtil.createCatalogFactoryHelper(this, context);
+        helper.validate();
 
         return new JdbcCatalog(
                 context.getName(),
-                configuration.getString(DEFAULT_DATABASE),
-                configuration.getString(USERNAME),
-                configuration.getString(PASSWORD),
-                configuration.getString(BASE_URL));
-    }
-
-    private void validateConfiguration(Configuration configuration) {
-        final String defaultDatabase = 
configuration.getString(DEFAULT_DATABASE);
-        if (defaultDatabase == null || defaultDatabase.isEmpty()) {
-            throw new ValidationException(
-                    String.format("Missing or empty value for '%s'", 
DEFAULT_DATABASE.key()));
-        }
-
-        final String username = configuration.getString(USERNAME);
-        if (username == null || username.isEmpty()) {
-            throw new ValidationException(
-                    String.format("Missing or empty value for '%s'", 
USERNAME.key()));
-        }
-
-        final String password = configuration.getString(PASSWORD);
-        if (password == null || password.isEmpty()) {
-            throw new ValidationException(
-                    String.format("Missing or empty value for '%s'", 
PASSWORD.key()));
-        }
-
-        final String baseUrl = configuration.getString(BASE_URL);
-        if (baseUrl == null || baseUrl.isEmpty()) {
-            throw new ValidationException(
-                    String.format("Missing or empty value for '%s'", 
BASE_URL.key()));
-        }
+                helper.getOptions().get(DEFAULT_DATABASE),
+                helper.getOptions().get(USERNAME),
+                helper.getOptions().get(PASSWORD),
+                helper.getOptions().get(BASE_URL));
     }
 }
diff --git 
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/GenericInMemoryCatalogFactory.java
 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/GenericInMemoryCatalogFactory.java
index 6ee2c14..95c0a02 100644
--- 
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/GenericInMemoryCatalogFactory.java
+++ 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/GenericInMemoryCatalogFactory.java
@@ -19,9 +19,8 @@
 package org.apache.flink.table.catalog;
 
 import org.apache.flink.configuration.ConfigOption;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.table.api.ValidationException;
 import org.apache.flink.table.factories.CatalogFactory;
+import org.apache.flink.table.factories.FactoryUtil;
 
 import java.util.Collections;
 import java.util.HashSet;
@@ -53,12 +52,11 @@ public class GenericInMemoryCatalogFactory implements 
CatalogFactory {
 
     @Override
     public Catalog createCatalog(Context context) {
-        final Configuration configuration = 
Configuration.fromMap(context.getOptions());
-        final String defaultDatabase = 
configuration.getString(DEFAULT_DATABASE);
-        if (defaultDatabase == null || defaultDatabase.isEmpty()) {
-            throw new ValidationException("The default database must not be 
empty");
-        }
+        final FactoryUtil.CatalogFactoryHelper helper =
+                FactoryUtil.createCatalogFactoryHelper(this, context);
+        helper.validate();
 
-        return new GenericInMemoryCatalog(context.getName(), defaultDatabase);
+        return new GenericInMemoryCatalog(
+                context.getName(), helper.getOptions().get(DEFAULT_DATABASE));
     }
 }
diff --git 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/factories/FactoryUtil.java
 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/factories/FactoryUtil.java
index 413193d..c038e79 100644
--- 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/factories/FactoryUtil.java
+++ 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/factories/FactoryUtil.java
@@ -54,6 +54,7 @@ import java.util.ServiceConfigurationError;
 import java.util.ServiceLoader;
 import java.util.Set;
 import java.util.stream.Collectors;
+import java.util.stream.Stream;
 
 /** Utility for working with {@link Factory}s. */
 @PublicEvolving
@@ -175,6 +176,16 @@ public final class FactoryUtil {
     }
 
     /**
+     * Creates a utility that helps validating options for a {@link 
CatalogFactory}.
+     *
+     * <p>Note: This utility checks for left-over options in the final step.
+     */
+    public static CatalogFactoryHelper createCatalogFactoryHelper(
+            CatalogFactory factory, CatalogFactory.Context context) {
+        return new CatalogFactoryHelper(factory, context);
+    }
+
+    /**
      * Creates a utility that helps in discovering formats and validating all 
options for a {@link
      * DynamicTableFactory}.
      *
@@ -376,7 +387,7 @@ public final class FactoryUtil {
         if (remainingOptionKeys.size() > 0) {
             throw new ValidationException(
                     String.format(
-                            "Unsupported options found for connector '%s'.\n\n"
+                            "Unsupported options found for '%s'.\n\n"
                                     + "Unsupported options:\n\n"
                                     + "%s\n\n"
                                     + "Supported options:\n\n"
@@ -508,6 +519,69 @@ public final class FactoryUtil {
     // 
--------------------------------------------------------------------------------------------
 
     /**
+     * Helper utility for catalog implementations to validate options provided 
by {@link
+     * CatalogFactory}.
+     */
+    @PublicEvolving
+    public static class CatalogFactoryHelper {
+
+        private final CatalogFactory catalogFactory;
+        private final CatalogFactory.Context context;
+        private final Configuration configuration;
+
+        private final Set<String> consumedOptionKeys;
+
+        public CatalogFactoryHelper(CatalogFactory catalogFactory, 
CatalogFactory.Context context) {
+            this.catalogFactory = catalogFactory;
+            this.context = context;
+            this.configuration = Configuration.fromMap(context.getOptions());
+
+            consumedOptionKeys = new HashSet<>();
+            consumedOptionKeys.add(PROPERTY_VERSION.key());
+            Stream.concat(
+                            catalogFactory.requiredOptions().stream(),
+                            catalogFactory.optionalOptions().stream())
+                    .map(ConfigOption::key)
+                    .forEach(consumedOptionKeys::add);
+        }
+
+        /**
+         * Validates the options of the {@link CatalogFactory}. It checks for 
unconsumed option
+         * keys.
+         */
+        public void validate() {
+            validateFactoryOptions(catalogFactory, configuration);
+            validateUnconsumedKeys(
+                    catalogFactory.factoryIdentifier(), 
configuration.keySet(), consumedOptionKeys);
+        }
+
+        /**
+         * Validates the options of the {@link CatalogFactory}. It checks for 
unconsumed option keys
+         * while ignoring the options with given prefixes.
+         *
+         * <p>The option keys that have given prefix {@code prefixToSkip} 
would just be skipped for
+         * validation.
+         *
+         * @param prefixesToSkip Set of option key prefixes to skip validation
+         */
+        public void validateExcept(String... prefixesToSkip) {
+            Preconditions.checkArgument(
+                    prefixesToSkip.length > 0, "Prefixes to skip can not be 
empty.");
+            final List<String> prefixesList = Arrays.asList(prefixesToSkip);
+            consumedOptionKeys.addAll(
+                    configuration.keySet().stream()
+                            .filter(key -> 
prefixesList.stream().anyMatch(key::startsWith))
+                            .collect(Collectors.toSet()));
+            validate();
+        }
+
+        /** Returns all options of the catalog. */
+        public ReadableConfig getOptions() {
+            return configuration;
+        }
+    }
+
+    /**
      * Helper utility for discovering formats and validating all options for a 
{@link
      * DynamicTableFactory}.
      *
diff --git 
a/flink-table/flink-table-common/src/test/java/org/apache/flink/table/factories/FactoryUtilTest.java
 
b/flink-table/flink-table-common/src/test/java/org/apache/flink/table/factories/FactoryUtilTest.java
index 7b04ea0..8d523f2 100644
--- 
a/flink-table/flink-table-common/src/test/java/org/apache/flink/table/factories/FactoryUtilTest.java
+++ 
b/flink-table/flink-table-common/src/test/java/org/apache/flink/table/factories/FactoryUtilTest.java
@@ -43,6 +43,7 @@ import java.util.Optional;
 import java.util.function.Consumer;
 
 import static org.apache.flink.core.testutils.FlinkMatchers.containsCause;
+import static org.apache.flink.core.testutils.FlinkMatchers.containsMessage;
 import static org.apache.flink.table.factories.utils.FactoryMocks.SCHEMA;
 import static 
org.apache.flink.table.factories.utils.FactoryMocks.createTableSink;
 import static 
org.apache.flink.table.factories.utils.FactoryMocks.createTableSource;
@@ -142,7 +143,7 @@ public class FactoryUtilTest {
     @Test
     public void testUnconsumedOption() {
         expectError(
-                "Unsupported options found for connector 'test-connector'.\n\n"
+                "Unsupported options found for 'test-connector'.\n\n"
                         + "Unsupported options:\n\n"
                         + "this-is-also-not-consumed\n"
                         + "this-is-not-consumed\n\n"
@@ -297,6 +298,34 @@ public class FactoryUtilTest {
                 "my-database");
     }
 
+    @Test
+    public void testCatalogFactoryHelper() {
+        final FactoryUtil.CatalogFactoryHelper helper1 =
+                FactoryUtil.createCatalogFactoryHelper(
+                        new TestCatalogFactory(),
+                        new FactoryUtil.DefaultCatalogContext(
+                                "test",
+                                Collections.emptyMap(),
+                                null,
+                                
Thread.currentThread().getContextClassLoader()));
+
+        // No error
+        helper1.validate();
+
+        final FactoryUtil.CatalogFactoryHelper helper2 =
+                FactoryUtil.createCatalogFactoryHelper(
+                        new TestCatalogFactory(),
+                        new FactoryUtil.DefaultCatalogContext(
+                                "test",
+                                Collections.singletonMap("x", "y"),
+                                null,
+                                
Thread.currentThread().getContextClassLoader()));
+
+        thrown.expect(ValidationException.class);
+        thrown.expect(containsMessage("Unsupported options found for 
'test-catalog'"));
+        helper2.validate();
+    }
+
     // 
--------------------------------------------------------------------------------------------
 
     private void expectError(String message) {

Reply via email to