This is an automated email from the ASF dual-hosted git repository.

twalthr pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit b0fa390da5081b7c6208a194159f5079fa1eb89f
Author: Ingo Bürk <ingo.bu...@tngtech.com>
AuthorDate: Thu Mar 18 09:03:20 2021 +0100

    [FLINK-21822][table] Migrate built-in and test catalog factories to new 
stack
---
 .../flink/connectors/hive/HiveTableSink.java       |   4 +-
 .../flink/connectors/hive/HiveTableSource.java     |   4 +-
 .../flink/table/catalog/hive/HiveCatalog.java      |   4 +-
 .../hive/descriptors/HiveCatalogDescriptor.java    |  71 ----
 .../hive/descriptors/HiveCatalogValidator.java     |  39 ---
 .../catalog/hive/factories/HiveCatalogFactory.java | 120 ++++---
 .../hive/factories/HiveCatalogFactoryOptions.java  |  51 +++
 .../org.apache.flink.table.factories.Factory       |   2 +-
 .../org.apache.flink.table.factories.TableFactory  |   1 -
 .../descriptors/HiveCatalogDescriptorTest.java     |  53 ---
 .../hive/factories/HiveCatalogFactoryTest.java     |  85 +++--
 .../jdbc/catalog/factory/JdbcCatalogFactory.java   |  99 +++---
 .../catalog/factory/JdbcCatalogFactoryOptions.java |  48 +++
 .../table/descriptors/JdbcCatalogDescriptor.java   |  67 ----
 .../table/descriptors/JdbcCatalogValidator.java    |  41 ---
 .../org.apache.flink.table.factories.Factory       |   1 +
 .../org.apache.flink.table.factories.TableFactory  |   1 -
 .../catalog/factory/JdbcCatalogFactoryTest.java    |  26 +-
 .../descriptors/JdbcCatalogDescriptorTest.java     |  61 ----
 .../table/client/config/entries/CatalogEntry.java  |   9 +-
 .../gateway/context/ExecutionContextTest.java      |  21 +-
 .../table/client/gateway/local/DependencyTest.java |  66 ++--
 .../client/gateway/local/EnvironmentTest.java      |   4 +-
 .../client/gateway/utils/SimpleCatalogFactory.java |  64 ++--
 ...ry => org.apache.flink.table.factories.Factory} |   5 +-
 .../org.apache.flink.table.factories.TableFactory  |   4 -
 .../src/test/resources/sql/catalog_database.q      |   3 +-
 .../src/test/resources/sql/function.q              |   2 +-
 .../test/resources/test-sql-client-catalogs.yaml   |   6 +-
 .../catalog/GenericInMemoryCatalogFactory.java     |  62 ++--
 .../GenericInMemoryCatalogFactoryOptions.java}     |  25 +-
 .../GenericInMemoryCatalogDescriptor.java          |  42 ---
 ...ry => org.apache.flink.table.factories.Factory} |   0
 .../catalog/GenericInMemoryCatalogFactoryTest.java |  19 +-
 .../GenericInMemoryCatalogDescriptorTest.java      |  64 ----
 .../flink/table/catalog/CommonCatalogOptions.java  |  44 +++
 .../flink/table/descriptors/CatalogDescriptor.java |  86 -----
 .../descriptors/CatalogDescriptorValidator.java    |  45 ---
 .../apache/flink/table/factories/FactoryUtil.java  |  26 +-
 .../flink/table/factories/TableFactoryService.java |   3 +-
 .../table/descriptors/CatalogDescriptorTest.java   | 103 ------
 .../flink/table/factories/FactoryUtilTest.java     |  24 ++
 .../flink/table/factories/TestCatalogFactory.java  | 364 +++++++++++++++++++++
 .../org.apache.flink.table.factories.Factory       |   1 +
 .../flink/table/planner/catalog/CatalogITCase.java |   6 +-
 .../table/factories/CatalogFactoryServiceTest.java |  22 +-
 ...gFactory.java => TestLegacyCatalogFactory.java} |  26 +-
 .../org.apache.flink.table.factories.TableFactory  |   2 +-
 48 files changed, 906 insertions(+), 1020 deletions(-)

diff --git 
a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveTableSink.java
 
b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveTableSink.java
index 229f309..9bc7838 100644
--- 
a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveTableSink.java
+++ 
b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveTableSink.java
@@ -45,7 +45,7 @@ import 
org.apache.flink.table.catalog.hive.client.HiveMetastoreClientFactory;
 import org.apache.flink.table.catalog.hive.client.HiveMetastoreClientWrapper;
 import org.apache.flink.table.catalog.hive.client.HiveShim;
 import org.apache.flink.table.catalog.hive.client.HiveShimLoader;
-import org.apache.flink.table.catalog.hive.descriptors.HiveCatalogValidator;
+import org.apache.flink.table.catalog.hive.factories.HiveCatalogFactoryOptions;
 import org.apache.flink.table.catalog.hive.util.HiveReflectionUtils;
 import org.apache.flink.table.connector.ChangelogMode;
 import org.apache.flink.table.connector.sink.DataStreamSinkProvider;
@@ -129,7 +129,7 @@ public class HiveTableSink implements DynamicTableSink, 
SupportsPartitioning, Su
         this.catalogTable = table;
         hiveVersion =
                 Preconditions.checkNotNull(
-                        jobConf.get(HiveCatalogValidator.CATALOG_HIVE_VERSION),
+                        
jobConf.get(HiveCatalogFactoryOptions.HIVE_VERSION.key()),
                         "Hive version is not defined");
         hiveShim = HiveShimLoader.loadHiveShim(hiveVersion);
         tableSchema = TableSchemaUtils.getPhysicalSchema(table.getSchema());
diff --git 
a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveTableSource.java
 
b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveTableSource.java
index 582c926..7cbc592 100644
--- 
a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveTableSource.java
+++ 
b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveTableSource.java
@@ -37,7 +37,7 @@ import org.apache.flink.table.catalog.CatalogTable;
 import org.apache.flink.table.catalog.ObjectPath;
 import org.apache.flink.table.catalog.hive.client.HiveShim;
 import org.apache.flink.table.catalog.hive.client.HiveShimLoader;
-import org.apache.flink.table.catalog.hive.descriptors.HiveCatalogValidator;
+import org.apache.flink.table.catalog.hive.factories.HiveCatalogFactoryOptions;
 import org.apache.flink.table.connector.ChangelogMode;
 import org.apache.flink.table.connector.source.DataStreamScanProvider;
 import org.apache.flink.table.connector.source.DynamicTableSource;
@@ -110,7 +110,7 @@ public class HiveTableSource
         this.catalogTable = Preconditions.checkNotNull(catalogTable);
         this.hiveVersion =
                 Preconditions.checkNotNull(
-                        jobConf.get(HiveCatalogValidator.CATALOG_HIVE_VERSION),
+                        
jobConf.get(HiveCatalogFactoryOptions.HIVE_VERSION.key()),
                         "Hive version is not defined");
         this.hiveShim = HiveShimLoader.loadHiveShim(hiveVersion);
     }
diff --git 
a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/HiveCatalog.java
 
b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/HiveCatalog.java
index 38befdf..9cd7568 100644
--- 
a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/HiveCatalog.java
+++ 
b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/HiveCatalog.java
@@ -60,7 +60,7 @@ import 
org.apache.flink.table.catalog.hive.client.HiveMetastoreClientFactory;
 import org.apache.flink.table.catalog.hive.client.HiveMetastoreClientWrapper;
 import org.apache.flink.table.catalog.hive.client.HiveShim;
 import org.apache.flink.table.catalog.hive.client.HiveShimLoader;
-import org.apache.flink.table.catalog.hive.descriptors.HiveCatalogValidator;
+import org.apache.flink.table.catalog.hive.factories.HiveCatalogFactoryOptions;
 import 
org.apache.flink.table.catalog.hive.factories.HiveFunctionDefinitionFactory;
 import org.apache.flink.table.catalog.hive.util.HiveReflectionUtils;
 import org.apache.flink.table.catalog.hive.util.HiveStatsUtil;
@@ -203,7 +203,7 @@ public class HiveCatalog extends AbstractCatalog {
         hiveShim = HiveShimLoader.loadHiveShim(hiveVersion);
         // add this to hiveConf to make sure table factory and source/sink see 
the same Hive version
         // as HiveCatalog
-        this.hiveConf.set(HiveCatalogValidator.CATALOG_HIVE_VERSION, 
hiveVersion);
+        this.hiveConf.set(HiveCatalogFactoryOptions.HIVE_VERSION.key(), 
hiveVersion);
 
         LOG.info("Created HiveCatalog '{}'", catalogName);
     }
diff --git 
a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/descriptors/HiveCatalogDescriptor.java
 
b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/descriptors/HiveCatalogDescriptor.java
deleted file mode 100644
index e6f9470..0000000
--- 
a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/descriptors/HiveCatalogDescriptor.java
+++ /dev/null
@@ -1,71 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.table.catalog.hive.descriptors;
-
-import org.apache.flink.table.catalog.hive.HiveCatalog;
-import org.apache.flink.table.descriptors.CatalogDescriptor;
-import org.apache.flink.table.descriptors.DescriptorProperties;
-import org.apache.flink.util.Preconditions;
-import org.apache.flink.util.StringUtils;
-
-import java.util.Map;
-
-import static 
org.apache.flink.table.catalog.hive.descriptors.HiveCatalogValidator.CATALOG_HIVE_CONF_DIR;
-import static 
org.apache.flink.table.catalog.hive.descriptors.HiveCatalogValidator.CATALOG_HIVE_VERSION;
-import static 
org.apache.flink.table.catalog.hive.descriptors.HiveCatalogValidator.CATALOG_TYPE_VALUE_HIVE;
-
-/** Catalog descriptor for {@link HiveCatalog}. */
-public class HiveCatalogDescriptor extends CatalogDescriptor {
-
-    private String hiveSitePath;
-    private String hiveVersion;
-
-    // TODO : set default database
-    public HiveCatalogDescriptor() {
-        super(CATALOG_TYPE_VALUE_HIVE, 1);
-    }
-
-    public HiveCatalogDescriptor hiveSitePath(String hiveSitePath) {
-        
Preconditions.checkArgument(!StringUtils.isNullOrWhitespaceOnly(hiveSitePath));
-        this.hiveSitePath = hiveSitePath;
-
-        return this;
-    }
-
-    public HiveCatalogDescriptor hiveVersion(String hiveVersion) {
-        
Preconditions.checkArgument(!StringUtils.isNullOrWhitespaceOnly(hiveVersion));
-        this.hiveVersion = hiveVersion;
-        return this;
-    }
-
-    @Override
-    protected Map<String, String> toCatalogProperties() {
-        final DescriptorProperties properties = new DescriptorProperties();
-
-        if (hiveSitePath != null) {
-            properties.putString(CATALOG_HIVE_CONF_DIR, hiveSitePath);
-        }
-
-        if (hiveVersion != null) {
-            properties.putString(CATALOG_HIVE_VERSION, hiveVersion);
-        }
-
-        return properties.asMap();
-    }
-}
diff --git 
a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/descriptors/HiveCatalogValidator.java
 
b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/descriptors/HiveCatalogValidator.java
deleted file mode 100644
index 75399c9..0000000
--- 
a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/descriptors/HiveCatalogValidator.java
+++ /dev/null
@@ -1,39 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.table.catalog.hive.descriptors;
-
-import org.apache.flink.table.descriptors.CatalogDescriptorValidator;
-import org.apache.flink.table.descriptors.DescriptorProperties;
-
-/** Validator for {@link HiveCatalogDescriptor}. */
-public class HiveCatalogValidator extends CatalogDescriptorValidator {
-    public static final String CATALOG_TYPE_VALUE_HIVE = "hive";
-    public static final String CATALOG_HIVE_CONF_DIR = "hive-conf-dir";
-    public static final String CATALOG_HIVE_VERSION = "hive-version";
-    public static final String CATALOG_HADOOP_CONF_DIR = "hadoop-conf-dir";
-
-    @Override
-    public void validate(DescriptorProperties properties) {
-        super.validate(properties);
-        properties.validateValue(CATALOG_TYPE, CATALOG_TYPE_VALUE_HIVE, false);
-        properties.validateString(CATALOG_HIVE_CONF_DIR, true, 1);
-        properties.validateString(CATALOG_HIVE_VERSION, true, 1);
-        properties.validateString(CATALOG_HADOOP_CONF_DIR, true, 1);
-    }
-}
diff --git 
a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/factories/HiveCatalogFactory.java
 
b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/factories/HiveCatalogFactory.java
index ace74ee..c1dce06 100644
--- 
a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/factories/HiveCatalogFactory.java
+++ 
b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/factories/HiveCatalogFactory.java
@@ -18,92 +18,88 @@
 
 package org.apache.flink.table.catalog.hive.factories;
 
+import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.table.api.ValidationException;
 import org.apache.flink.table.catalog.Catalog;
 import org.apache.flink.table.catalog.hive.HiveCatalog;
-import org.apache.flink.table.catalog.hive.client.HiveShimLoader;
-import org.apache.flink.table.catalog.hive.descriptors.HiveCatalogValidator;
-import org.apache.flink.table.descriptors.DescriptorProperties;
 import org.apache.flink.table.factories.CatalogFactory;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Optional;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Set;
 
-import static 
org.apache.flink.table.catalog.hive.descriptors.HiveCatalogValidator.CATALOG_HADOOP_CONF_DIR;
-import static 
org.apache.flink.table.catalog.hive.descriptors.HiveCatalogValidator.CATALOG_HIVE_CONF_DIR;
-import static 
org.apache.flink.table.catalog.hive.descriptors.HiveCatalogValidator.CATALOG_HIVE_VERSION;
-import static 
org.apache.flink.table.catalog.hive.descriptors.HiveCatalogValidator.CATALOG_TYPE_VALUE_HIVE;
-import static 
org.apache.flink.table.descriptors.CatalogDescriptorValidator.CATALOG_DEFAULT_DATABASE;
-import static 
org.apache.flink.table.descriptors.CatalogDescriptorValidator.CATALOG_PROPERTY_VERSION;
-import static 
org.apache.flink.table.descriptors.CatalogDescriptorValidator.CATALOG_TYPE;
+import static 
org.apache.flink.table.catalog.hive.factories.HiveCatalogFactoryOptions.DEFAULT_DATABASE;
+import static 
org.apache.flink.table.catalog.hive.factories.HiveCatalogFactoryOptions.HADOOP_CONF_DIR;
+import static 
org.apache.flink.table.catalog.hive.factories.HiveCatalogFactoryOptions.HIVE_CONF_DIR;
+import static 
org.apache.flink.table.catalog.hive.factories.HiveCatalogFactoryOptions.HIVE_VERSION;
+import static org.apache.flink.table.factories.FactoryUtil.PROPERTY_VERSION;
 
 /** Catalog factory for {@link HiveCatalog}. */
 public class HiveCatalogFactory implements CatalogFactory {
     private static final Logger LOG = 
LoggerFactory.getLogger(HiveCatalogFactory.class);
 
     @Override
-    public Map<String, String> requiredContext() {
-        Map<String, String> context = new HashMap<>();
-        context.put(CATALOG_TYPE, CATALOG_TYPE_VALUE_HIVE); // hive
-        context.put(CATALOG_PROPERTY_VERSION, "1"); // backwards compatibility
-        return context;
+    public String factoryIdentifier() {
+        return HiveCatalogFactoryOptions.IDENTIFIER;
     }
 
     @Override
-    public List<String> supportedProperties() {
-        List<String> properties = new ArrayList<>();
-
-        // default database
-        properties.add(CATALOG_DEFAULT_DATABASE);
-
-        properties.add(CATALOG_HIVE_CONF_DIR);
-
-        properties.add(CATALOG_HIVE_VERSION);
-
-        properties.add(CATALOG_HADOOP_CONF_DIR);
-
-        return properties;
+    public Set<ConfigOption<?>> requiredOptions() {
+        return Collections.emptySet();
     }
 
     @Override
-    public Catalog createCatalog(String name, Map<String, String> properties) {
-        final DescriptorProperties descriptorProperties = 
getValidatedProperties(properties);
-
-        final String defaultDatabase =
-                descriptorProperties
-                        .getOptionalString(CATALOG_DEFAULT_DATABASE)
-                        .orElse(HiveCatalog.DEFAULT_DB);
-
-        final Optional<String> hiveConfDir =
-                descriptorProperties.getOptionalString(CATALOG_HIVE_CONF_DIR);
-
-        final Optional<String> hadoopConfDir =
-                
descriptorProperties.getOptionalString(CATALOG_HADOOP_CONF_DIR);
+    public Set<ConfigOption<?>> optionalOptions() {
+        final Set<ConfigOption<?>> options = new HashSet<>();
+        options.add(DEFAULT_DATABASE);
+        options.add(PROPERTY_VERSION);
+        options.add(HIVE_CONF_DIR);
+        options.add(HIVE_VERSION);
+        options.add(HADOOP_CONF_DIR);
+        return options;
+    }
 
-        final String version =
-                descriptorProperties
-                        .getOptionalString(CATALOG_HIVE_VERSION)
-                        .orElse(HiveShimLoader.getHiveVersion());
+    @Override
+    public Catalog createCatalog(Context context) {
+        final Configuration configuration = 
Configuration.fromMap(context.getOptions());
+        validateConfiguration(configuration);
 
         return new HiveCatalog(
-                name,
-                defaultDatabase,
-                hiveConfDir.orElse(null),
-                hadoopConfDir.orElse(null),
-                version);
+                context.getName(),
+                configuration.getString(DEFAULT_DATABASE),
+                configuration.getString(HIVE_CONF_DIR),
+                configuration.getString(HADOOP_CONF_DIR),
+                configuration.getString(HIVE_VERSION));
     }
 
-    private static DescriptorProperties getValidatedProperties(Map<String, 
String> properties) {
-        final DescriptorProperties descriptorProperties = new 
DescriptorProperties(true);
-        descriptorProperties.putProperties(properties);
-
-        new HiveCatalogValidator().validate(descriptorProperties);
-
-        return descriptorProperties;
+    private void validateConfiguration(Configuration configuration) {
+        final String defaultDatabase = 
configuration.getString(DEFAULT_DATABASE);
+        if (defaultDatabase != null && defaultDatabase.isEmpty()) {
+            throw new ValidationException(
+                    String.format(
+                            "Option '%s' was provided, but is empty", 
DEFAULT_DATABASE.key()));
+        }
+
+        final String hiveConfDir = configuration.getString(HIVE_CONF_DIR);
+        if (hiveConfDir != null && hiveConfDir.isEmpty()) {
+            throw new ValidationException(
+                    String.format("Option '%s' was provided, but is empty", 
HIVE_CONF_DIR.key()));
+        }
+
+        final String hadoopConfDir = configuration.getString(HADOOP_CONF_DIR);
+        if (hadoopConfDir != null && hadoopConfDir.isEmpty()) {
+            throw new ValidationException(
+                    String.format("Option '%s' was provided, but is empty", 
HADOOP_CONF_DIR.key()));
+        }
+
+        final String hiveVersion = configuration.getString(HIVE_VERSION);
+        if (hiveVersion != null && hiveVersion.isEmpty()) {
+            throw new ValidationException(
+                    String.format("Option '%s' was provided, but is empty", 
HIVE_VERSION.key()));
+        }
     }
 }
diff --git 
a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/factories/HiveCatalogFactoryOptions.java
 
b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/factories/HiveCatalogFactoryOptions.java
new file mode 100644
index 0000000..59348cf
--- /dev/null
+++ 
b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/factories/HiveCatalogFactoryOptions.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.catalog.hive.factories;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.configuration.ConfigOptions;
+import org.apache.flink.table.catalog.CommonCatalogOptions;
+import org.apache.flink.table.catalog.hive.HiveCatalog;
+import org.apache.flink.table.catalog.hive.client.HiveShimLoader;
+
+/** {@link ConfigOption}s for {@link HiveCatalog}. */
+@Internal
+public final class HiveCatalogFactoryOptions {
+
+    public static final String IDENTIFIER = "hive";
+
+    public static final ConfigOption<String> DEFAULT_DATABASE =
+            ConfigOptions.key(CommonCatalogOptions.DEFAULT_DATABASE_KEY)
+                    .stringType()
+                    .defaultValue(HiveCatalog.DEFAULT_DB);
+
+    public static final ConfigOption<String> HIVE_CONF_DIR =
+            ConfigOptions.key("hive-conf-dir").stringType().noDefaultValue();
+
+    public static final ConfigOption<String> HIVE_VERSION =
+            ConfigOptions.key("hive-version")
+                    .stringType()
+                    .defaultValue(HiveShimLoader.getHiveVersion());
+
+    public static final ConfigOption<String> HADOOP_CONF_DIR =
+            ConfigOptions.key("hadoop-conf-dir").stringType().noDefaultValue();
+
+    private HiveCatalogFactoryOptions() {}
+}
diff --git 
a/flink-connectors/flink-connector-jdbc/src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory
 
b/flink-connectors/flink-connector-hive/src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory
similarity index 92%
copy from 
flink-connectors/flink-connector-jdbc/src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory
copy to 
flink-connectors/flink-connector-hive/src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory
index 86713a6..27d69ee 100644
--- 
a/flink-connectors/flink-connector-jdbc/src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory
+++ 
b/flink-connectors/flink-connector-hive/src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory
@@ -13,4 +13,4 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
-org.apache.flink.connector.jdbc.table.JdbcDynamicTableFactory
+org.apache.flink.table.catalog.hive.factories.HiveCatalogFactory
diff --git 
a/flink-connectors/flink-connector-hive/src/main/resources/META-INF/services/org.apache.flink.table.factories.TableFactory
 
b/flink-connectors/flink-connector-hive/src/main/resources/META-INF/services/org.apache.flink.table.factories.TableFactory
index 0e5b457..5292f10 100644
--- 
a/flink-connectors/flink-connector-hive/src/main/resources/META-INF/services/org.apache.flink.table.factories.TableFactory
+++ 
b/flink-connectors/flink-connector-hive/src/main/resources/META-INF/services/org.apache.flink.table.factories.TableFactory
@@ -13,6 +13,5 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
-org.apache.flink.table.catalog.hive.factories.HiveCatalogFactory
 org.apache.flink.table.module.hive.HiveModuleFactory
 org.apache.flink.table.planner.delegation.hive.HiveParserFactory
diff --git 
a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/catalog/hive/descriptors/HiveCatalogDescriptorTest.java
 
b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/catalog/hive/descriptors/HiveCatalogDescriptorTest.java
deleted file mode 100644
index e6f50a2..0000000
--- 
a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/catalog/hive/descriptors/HiveCatalogDescriptorTest.java
+++ /dev/null
@@ -1,53 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.table.catalog.hive.descriptors;
-
-import org.apache.flink.table.descriptors.Descriptor;
-import org.apache.flink.table.descriptors.DescriptorTestBase;
-import org.apache.flink.table.descriptors.DescriptorValidator;
-
-import java.util.Arrays;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
-/** Tests for {@link HiveCatalogDescriptor}. */
-public class HiveCatalogDescriptorTest extends DescriptorTestBase {
-
-    @Override
-    protected List<Descriptor> descriptors() {
-        final Descriptor descriptor = new HiveCatalogDescriptor();
-
-        return Arrays.asList(descriptor);
-    }
-
-    @Override
-    protected List<Map<String, String>> properties() {
-        final Map<String, String> props1 = new HashMap<>();
-        props1.put("type", "hive");
-        props1.put("property-version", "1");
-
-        return Arrays.asList(props1);
-    }
-
-    @Override
-    protected DescriptorValidator validator() {
-        return new HiveCatalogValidator();
-    }
-}
diff --git 
a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/catalog/hive/factories/HiveCatalogFactoryTest.java
 
b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/catalog/hive/factories/HiveCatalogFactoryTest.java
index 9314875..c6ce2a3 100644
--- 
a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/catalog/hive/factories/HiveCatalogFactoryTest.java
+++ 
b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/catalog/hive/factories/HiveCatalogFactoryTest.java
@@ -19,12 +19,12 @@
 package org.apache.flink.table.catalog.hive.factories;
 
 import org.apache.flink.core.testutils.CommonTestUtils;
+import org.apache.flink.table.api.ValidationException;
 import org.apache.flink.table.catalog.Catalog;
+import org.apache.flink.table.catalog.CommonCatalogOptions;
 import org.apache.flink.table.catalog.hive.HiveCatalog;
 import org.apache.flink.table.catalog.hive.HiveTestUtils;
-import org.apache.flink.table.catalog.hive.descriptors.HiveCatalogDescriptor;
-import org.apache.flink.table.factories.CatalogFactory;
-import org.apache.flink.table.factories.TableFactoryService;
+import org.apache.flink.table.factories.FactoryUtil;
 import org.apache.flink.util.TestLogger;
 
 import org.apache.hadoop.hive.conf.HiveConf;
@@ -45,7 +45,6 @@ import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.Future;
 
-import static 
org.apache.flink.table.catalog.hive.descriptors.HiveCatalogValidator.CATALOG_HADOOP_CONF_DIR;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertNull;
@@ -66,14 +65,13 @@ public class HiveCatalogFactoryTest extends TestLogger {
 
         final HiveCatalog expectedCatalog = 
HiveTestUtils.createHiveCatalog(catalogName, null);
 
-        final HiveCatalogDescriptor catalogDescriptor = new 
HiveCatalogDescriptor();
-        catalogDescriptor.hiveSitePath(CONF_DIR.getPath());
-
-        final Map<String, String> properties = 
catalogDescriptor.toProperties();
+        final Map<String, String> options = new HashMap<>();
+        options.put(CommonCatalogOptions.CATALOG_TYPE.key(), 
HiveCatalogFactoryOptions.IDENTIFIER);
+        options.put(HiveCatalogFactoryOptions.HIVE_CONF_DIR.key(), 
CONF_DIR.getPath());
 
         final Catalog actualCatalog =
-                TableFactoryService.find(CatalogFactory.class, properties)
-                        .createCatalog(catalogName, properties);
+                FactoryUtil.createCatalog(
+                        catalogName, options, null, 
Thread.currentThread().getContextClassLoader());
 
         assertEquals(
                 "dummy-hms",
@@ -97,15 +95,14 @@ public class HiveCatalogFactoryTest extends TestLogger {
                 HiveTestUtils.createHiveCatalog(
                         catalogName, CONF_DIR.getPath(), hadoopConfDir, null);
 
-        final HiveCatalogDescriptor catalogDescriptor = new 
HiveCatalogDescriptor();
-        catalogDescriptor.hiveSitePath(CONF_DIR.getPath());
-
-        final Map<String, String> properties = new 
HashMap<>(catalogDescriptor.toProperties());
-        properties.put(CATALOG_HADOOP_CONF_DIR, hadoopConfDir);
+        final Map<String, String> options = new HashMap<>();
+        options.put(CommonCatalogOptions.CATALOG_TYPE.key(), 
HiveCatalogFactoryOptions.IDENTIFIER);
+        options.put(HiveCatalogFactoryOptions.HIVE_CONF_DIR.key(), 
CONF_DIR.getPath());
+        options.put(HiveCatalogFactoryOptions.HADOOP_CONF_DIR.key(), 
hadoopConfDir);
 
         final Catalog actualCatalog =
-                TableFactoryService.find(CatalogFactory.class, properties)
-                        .createCatalog(catalogName, properties);
+                FactoryUtil.createCatalog(
+                        catalogName, options, null, 
Thread.currentThread().getContextClassLoader());
 
         checkEquals(expectedCatalog, (HiveCatalog) actualCatalog);
         assertEquals(mapredVal, ((HiveCatalog) 
actualCatalog).getHiveConf().get(mapredKey));
@@ -138,15 +135,21 @@ public class HiveCatalogFactoryTest extends TestLogger {
         CommonTestUtils.setEnv(newEnv);
 
         // create HiveCatalog use the Hadoop Configuration
-        final HiveCatalogDescriptor catalogDescriptor = new 
HiveCatalogDescriptor();
-        catalogDescriptor.hiveSitePath(CONF_DIR.getPath());
-        final Map<String, String> properties = 
catalogDescriptor.toProperties();
         final HiveConf hiveConf;
         try {
+            final Map<String, String> options = new HashMap<>();
+            options.put(
+                    CommonCatalogOptions.CATALOG_TYPE.key(), 
HiveCatalogFactoryOptions.IDENTIFIER);
+            options.put(HiveCatalogFactoryOptions.HIVE_CONF_DIR.key(), 
CONF_DIR.getPath());
+
             final HiveCatalog hiveCatalog =
                     (HiveCatalog)
-                            TableFactoryService.find(CatalogFactory.class, 
properties)
-                                    .createCatalog(catalogName, properties);
+                            FactoryUtil.createCatalog(
+                                    catalogName,
+                                    options,
+                                    null,
+                                    
Thread.currentThread().getContextClassLoader());
+
             hiveConf = hiveCatalog.getHiveConf();
         } finally {
             // set the Env back
@@ -160,40 +163,50 @@ public class HiveCatalogFactoryTest extends TestLogger {
 
     @Test
     public void testDisallowEmbedded() {
-        expectedException.expect(IllegalArgumentException.class);
-        final Map<String, String> properties = new 
HiveCatalogDescriptor().toProperties();
+        expectedException.expect(ValidationException.class);
+
+        final Map<String, String> options = new HashMap<>();
+        options.put(CommonCatalogOptions.CATALOG_TYPE.key(), 
HiveCatalogFactoryOptions.IDENTIFIER);
 
-        TableFactoryService.find(CatalogFactory.class, properties)
-                .createCatalog("my_catalog", properties);
+        FactoryUtil.createCatalog(
+                "my_catalog", options, null, 
Thread.currentThread().getContextClassLoader());
     }
 
     @Test
     public void testCreateMultipleHiveCatalog() throws Exception {
-        final HiveCatalogDescriptor descriptor1 = new HiveCatalogDescriptor();
-        descriptor1.hiveSitePath(
+        final Map<String, String> props1 = new HashMap<>();
+        props1.put(CommonCatalogOptions.CATALOG_TYPE.key(), 
HiveCatalogFactoryOptions.IDENTIFIER);
+        props1.put(
+                HiveCatalogFactoryOptions.HIVE_CONF_DIR.key(),
                 Thread.currentThread()
                         .getContextClassLoader()
                         .getResource("test-multi-hive-conf1")
                         .getPath());
-        Map<String, String> props1 = descriptor1.toProperties();
 
-        final HiveCatalogDescriptor descriptor2 = new HiveCatalogDescriptor();
-        descriptor2.hiveSitePath(
+        final Map<String, String> props2 = new HashMap<>();
+        props2.put(CommonCatalogOptions.CATALOG_TYPE.key(), 
HiveCatalogFactoryOptions.IDENTIFIER);
+        props2.put(
+                HiveCatalogFactoryOptions.HIVE_CONF_DIR.key(),
                 Thread.currentThread()
                         .getContextClassLoader()
                         .getResource("test-multi-hive-conf2")
                         .getPath());
-        Map<String, String> props2 = descriptor2.toProperties();
 
         Callable<Catalog> callable1 =
                 () ->
-                        TableFactoryService.find(CatalogFactory.class, props1)
-                                .createCatalog("cat1", props1);
+                        FactoryUtil.createCatalog(
+                                "cat1",
+                                props1,
+                                null,
+                                
Thread.currentThread().getContextClassLoader());
 
         Callable<Catalog> callable2 =
                 () ->
-                        TableFactoryService.find(CatalogFactory.class, props2)
-                                .createCatalog("cat2", props2);
+                        FactoryUtil.createCatalog(
+                                "cat2",
+                                props2,
+                                null,
+                                
Thread.currentThread().getContextClassLoader());
 
         ExecutorService executorService = Executors.newFixedThreadPool(2);
         Future<Catalog> future1 = executorService.submit(callable1);
diff --git 
a/flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/catalog/factory/JdbcCatalogFactory.java
 
b/flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/catalog/factory/JdbcCatalogFactory.java
index 8238b98..3b361ad 100644
--- 
a/flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/catalog/factory/JdbcCatalogFactory.java
+++ 
b/flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/catalog/factory/JdbcCatalogFactory.java
@@ -18,27 +18,24 @@
 
 package org.apache.flink.connector.jdbc.catalog.factory;
 
+import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.configuration.Configuration;
 import org.apache.flink.connector.jdbc.catalog.JdbcCatalog;
+import org.apache.flink.table.api.ValidationException;
 import org.apache.flink.table.catalog.Catalog;
-import org.apache.flink.table.descriptors.DescriptorProperties;
-import org.apache.flink.table.descriptors.JdbcCatalogValidator;
 import org.apache.flink.table.factories.CatalogFactory;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
+import java.util.HashSet;
+import java.util.Set;
 
-import static 
org.apache.flink.table.descriptors.CatalogDescriptorValidator.CATALOG_DEFAULT_DATABASE;
-import static 
org.apache.flink.table.descriptors.CatalogDescriptorValidator.CATALOG_PROPERTY_VERSION;
-import static 
org.apache.flink.table.descriptors.CatalogDescriptorValidator.CATALOG_TYPE;
-import static 
org.apache.flink.table.descriptors.JdbcCatalogValidator.CATALOG_JDBC_BASE_URL;
-import static 
org.apache.flink.table.descriptors.JdbcCatalogValidator.CATALOG_JDBC_PASSWORD;
-import static 
org.apache.flink.table.descriptors.JdbcCatalogValidator.CATALOG_JDBC_USERNAME;
-import static 
org.apache.flink.table.descriptors.JdbcCatalogValidator.CATALOG_TYPE_VALUE_JDBC;
+import static 
org.apache.flink.connector.jdbc.catalog.factory.JdbcCatalogFactoryOptions.BASE_URL;
+import static 
org.apache.flink.connector.jdbc.catalog.factory.JdbcCatalogFactoryOptions.DEFAULT_DATABASE;
+import static 
org.apache.flink.connector.jdbc.catalog.factory.JdbcCatalogFactoryOptions.PASSWORD;
+import static 
org.apache.flink.connector.jdbc.catalog.factory.JdbcCatalogFactoryOptions.USERNAME;
+import static org.apache.flink.table.factories.FactoryUtil.PROPERTY_VERSION;
 
 /** Factory for {@link JdbcCatalog}. */
 public class JdbcCatalogFactory implements CatalogFactory {
@@ -46,45 +43,63 @@ public class JdbcCatalogFactory implements CatalogFactory {
     private static final Logger LOG = 
LoggerFactory.getLogger(JdbcCatalogFactory.class);
 
     @Override
-    public Map<String, String> requiredContext() {
-        Map<String, String> context = new HashMap<>();
-        context.put(CATALOG_TYPE, CATALOG_TYPE_VALUE_JDBC); // jdbc
-        context.put(CATALOG_PROPERTY_VERSION, "1"); // backwards compatibility
-        return context;
+    public String factoryIdentifier() {
+        return JdbcCatalogFactoryOptions.IDENTIFIER;
     }
 
     @Override
-    public List<String> supportedProperties() {
-        List<String> properties = new ArrayList<>();
-
-        // default database
-        properties.add(CATALOG_DEFAULT_DATABASE);
-
-        properties.add(CATALOG_JDBC_BASE_URL);
-        properties.add(CATALOG_JDBC_USERNAME);
-        properties.add(CATALOG_JDBC_PASSWORD);
+    public Set<ConfigOption<?>> requiredOptions() {
+        final Set<ConfigOption<?>> options = new HashSet<>();
+        options.add(DEFAULT_DATABASE);
+        options.add(USERNAME);
+        options.add(PASSWORD);
+        options.add(BASE_URL);
+        return options;
+    }
 
-        return properties;
+    @Override
+    public Set<ConfigOption<?>> optionalOptions() {
+        final Set<ConfigOption<?>> options = new HashSet<>();
+        options.add(PROPERTY_VERSION);
+        return options;
     }
 
     @Override
-    public Catalog createCatalog(String name, Map<String, String> properties) {
-        final DescriptorProperties prop = getValidatedProperties(properties);
+    public Catalog createCatalog(Context context) {
+        final Configuration configuration = 
Configuration.fromMap(context.getOptions());
+        validateConfiguration(configuration);
 
         return new JdbcCatalog(
-                name,
-                prop.getString(CATALOG_DEFAULT_DATABASE),
-                prop.getString(CATALOG_JDBC_USERNAME),
-                prop.getString(CATALOG_JDBC_PASSWORD),
-                prop.getString(CATALOG_JDBC_BASE_URL));
+                context.getName(),
+                configuration.getString(DEFAULT_DATABASE),
+                configuration.getString(USERNAME),
+                configuration.getString(PASSWORD),
+                configuration.getString(BASE_URL));
     }
 
-    private static DescriptorProperties getValidatedProperties(Map<String, 
String> properties) {
-        final DescriptorProperties descriptorProperties = new 
DescriptorProperties(true);
-        descriptorProperties.putProperties(properties);
-
-        new JdbcCatalogValidator().validate(descriptorProperties);
-
-        return descriptorProperties;
+    private void validateConfiguration(Configuration configuration) {
+        final String defaultDatabase = 
configuration.getString(DEFAULT_DATABASE);
+        if (defaultDatabase == null || defaultDatabase.isEmpty()) {
+            throw new ValidationException(
+                    String.format("Missing or empty value for '%s'", 
DEFAULT_DATABASE.key()));
+        }
+
+        final String username = configuration.getString(USERNAME);
+        if (username == null || username.isEmpty()) {
+            throw new ValidationException(
+                    String.format("Missing or empty value for '%s'", 
USERNAME.key()));
+        }
+
+        final String password = configuration.getString(PASSWORD);
+        if (password == null || password.isEmpty()) {
+            throw new ValidationException(
+                    String.format("Missing or empty value for '%s'", 
PASSWORD.key()));
+        }
+
+        final String baseUrl = configuration.getString(BASE_URL);
+        if (baseUrl == null || baseUrl.isEmpty()) {
+            throw new ValidationException(
+                    String.format("Missing or empty value for '%s'", 
BASE_URL.key()));
+        }
     }
 }
diff --git 
a/flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/catalog/factory/JdbcCatalogFactoryOptions.java
 
b/flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/catalog/factory/JdbcCatalogFactoryOptions.java
new file mode 100644
index 0000000..0cdde62
--- /dev/null
+++ 
b/flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/catalog/factory/JdbcCatalogFactoryOptions.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.jdbc.catalog.factory;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.configuration.ConfigOptions;
+import org.apache.flink.connector.jdbc.catalog.JdbcCatalog;
+import org.apache.flink.table.catalog.CommonCatalogOptions;
+
+/** {@link ConfigOption}s for {@link JdbcCatalog}. */
+@Internal
+public class JdbcCatalogFactoryOptions {
+
+    public static final String IDENTIFIER = "jdbc";
+
+    public static final ConfigOption<String> DEFAULT_DATABASE =
+            ConfigOptions.key(CommonCatalogOptions.DEFAULT_DATABASE_KEY)
+                    .stringType()
+                    .noDefaultValue();
+
+    public static final ConfigOption<String> USERNAME =
+            ConfigOptions.key("username").stringType().noDefaultValue();
+
+    public static final ConfigOption<String> PASSWORD =
+            ConfigOptions.key("password").stringType().noDefaultValue();
+
+    public static final ConfigOption<String> BASE_URL =
+            ConfigOptions.key("base-url").stringType().noDefaultValue();
+
+    private JdbcCatalogFactoryOptions() {}
+}
diff --git 
a/flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/table/descriptors/JdbcCatalogDescriptor.java
 
b/flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/table/descriptors/JdbcCatalogDescriptor.java
deleted file mode 100644
index 0033a2c..0000000
--- 
a/flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/table/descriptors/JdbcCatalogDescriptor.java
+++ /dev/null
@@ -1,67 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.table.descriptors;
-
-import org.apache.flink.util.StringUtils;
-
-import java.util.Map;
-
-import static 
org.apache.flink.table.descriptors.CatalogDescriptorValidator.CATALOG_DEFAULT_DATABASE;
-import static 
org.apache.flink.table.descriptors.JdbcCatalogValidator.CATALOG_JDBC_BASE_URL;
-import static 
org.apache.flink.table.descriptors.JdbcCatalogValidator.CATALOG_JDBC_PASSWORD;
-import static 
org.apache.flink.table.descriptors.JdbcCatalogValidator.CATALOG_JDBC_USERNAME;
-import static 
org.apache.flink.table.descriptors.JdbcCatalogValidator.CATALOG_TYPE_VALUE_JDBC;
-import static org.apache.flink.util.Preconditions.checkArgument;
-
-/** Descriptor for {@link 
org.apache.flink.connector.jdbc.catalog.JdbcCatalog}. */
-public class JdbcCatalogDescriptor extends CatalogDescriptor {
-
-    private final String defaultDatabase;
-    private final String username;
-    private final String pwd;
-    private final String baseUrl;
-
-    public JdbcCatalogDescriptor(
-            String defaultDatabase, String username, String pwd, String 
baseUrl) {
-
-        super(CATALOG_TYPE_VALUE_JDBC, 1);
-
-        checkArgument(!StringUtils.isNullOrWhitespaceOnly(defaultDatabase));
-        checkArgument(!StringUtils.isNullOrWhitespaceOnly(username));
-        checkArgument(!StringUtils.isNullOrWhitespaceOnly(pwd));
-        checkArgument(!StringUtils.isNullOrWhitespaceOnly(baseUrl));
-
-        this.defaultDatabase = defaultDatabase;
-        this.username = username;
-        this.pwd = pwd;
-        this.baseUrl = baseUrl;
-    }
-
-    @Override
-    protected Map<String, String> toCatalogProperties() {
-        final DescriptorProperties properties = new DescriptorProperties();
-
-        properties.putString(CATALOG_DEFAULT_DATABASE, defaultDatabase);
-        properties.putString(CATALOG_JDBC_USERNAME, username);
-        properties.putString(CATALOG_JDBC_PASSWORD, pwd);
-        properties.putString(CATALOG_JDBC_BASE_URL, baseUrl);
-
-        return properties.asMap();
-    }
-}
diff --git 
a/flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/table/descriptors/JdbcCatalogValidator.java
 
b/flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/table/descriptors/JdbcCatalogValidator.java
deleted file mode 100644
index ce48ca2..0000000
--- 
a/flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/table/descriptors/JdbcCatalogValidator.java
+++ /dev/null
@@ -1,41 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.table.descriptors;
-
-import org.apache.flink.connector.jdbc.catalog.JdbcCatalog;
-
-/** Validator for {@link JdbcCatalog}. */
-public class JdbcCatalogValidator extends CatalogDescriptorValidator {
-
-    public static final String CATALOG_TYPE_VALUE_JDBC = "jdbc";
-
-    public static final String CATALOG_JDBC_USERNAME = "username";
-    public static final String CATALOG_JDBC_PASSWORD = "password";
-    public static final String CATALOG_JDBC_BASE_URL = "base-url";
-
-    @Override
-    public void validate(DescriptorProperties properties) {
-        super.validate(properties);
-        properties.validateValue(CATALOG_TYPE, CATALOG_TYPE_VALUE_JDBC, false);
-        properties.validateString(CATALOG_JDBC_BASE_URL, false, 1);
-        properties.validateString(CATALOG_JDBC_USERNAME, false, 1);
-        properties.validateString(CATALOG_JDBC_PASSWORD, false, 1);
-        properties.validateString(CATALOG_DEFAULT_DATABASE, false, 1);
-    }
-}
diff --git 
a/flink-connectors/flink-connector-jdbc/src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory
 
b/flink-connectors/flink-connector-jdbc/src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory
index 86713a6..a54d9f5 100644
--- 
a/flink-connectors/flink-connector-jdbc/src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory
+++ 
b/flink-connectors/flink-connector-jdbc/src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory
@@ -14,3 +14,4 @@
 # limitations under the License.
 
 org.apache.flink.connector.jdbc.table.JdbcDynamicTableFactory
+org.apache.flink.connector.jdbc.catalog.factory.JdbcCatalogFactory
diff --git 
a/flink-connectors/flink-connector-jdbc/src/main/resources/META-INF/services/org.apache.flink.table.factories.TableFactory
 
b/flink-connectors/flink-connector-jdbc/src/main/resources/META-INF/services/org.apache.flink.table.factories.TableFactory
index ecc4e8c..6892b6d 100644
--- 
a/flink-connectors/flink-connector-jdbc/src/main/resources/META-INF/services/org.apache.flink.table.factories.TableFactory
+++ 
b/flink-connectors/flink-connector-jdbc/src/main/resources/META-INF/services/org.apache.flink.table.factories.TableFactory
@@ -13,5 +13,4 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
-org.apache.flink.connector.jdbc.catalog.factory.JdbcCatalogFactory
 org.apache.flink.connector.jdbc.table.JdbcTableSourceSinkFactory
diff --git 
a/flink-connectors/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/catalog/factory/JdbcCatalogFactoryTest.java
 
b/flink-connectors/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/catalog/factory/JdbcCatalogFactoryTest.java
index 45380ee..bce5495 100644
--- 
a/flink-connectors/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/catalog/factory/JdbcCatalogFactoryTest.java
+++ 
b/flink-connectors/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/catalog/factory/JdbcCatalogFactoryTest.java
@@ -21,10 +21,8 @@ package org.apache.flink.connector.jdbc.catalog.factory;
 import org.apache.flink.connector.jdbc.catalog.JdbcCatalog;
 import org.apache.flink.connector.jdbc.catalog.PostgresCatalog;
 import org.apache.flink.table.catalog.Catalog;
-import org.apache.flink.table.descriptors.CatalogDescriptor;
-import org.apache.flink.table.descriptors.JdbcCatalogDescriptor;
-import org.apache.flink.table.factories.CatalogFactory;
-import org.apache.flink.table.factories.TableFactoryService;
+import org.apache.flink.table.catalog.CommonCatalogOptions;
+import org.apache.flink.table.factories.FactoryUtil;
 
 import com.opentable.db.postgres.junit.EmbeddedPostgresRules;
 import com.opentable.db.postgres.junit.SingleInstancePostgresRule;
@@ -33,6 +31,7 @@ import org.junit.ClassRule;
 import org.junit.Test;
 
 import java.sql.SQLException;
+import java.util.HashMap;
 import java.util.Map;
 
 import static org.junit.Assert.assertEquals;
@@ -67,15 +66,20 @@ public class JdbcCatalogFactoryTest {
 
     @Test
     public void test() {
-        final CatalogDescriptor catalogDescriptor =
-                new JdbcCatalogDescriptor(
-                        PostgresCatalog.DEFAULT_DATABASE, TEST_USERNAME, 
TEST_PWD, baseUrl);
-
-        final Map<String, String> properties = 
catalogDescriptor.toProperties();
+        final Map<String, String> options = new HashMap<>();
+        options.put(CommonCatalogOptions.CATALOG_TYPE.key(), 
JdbcCatalogFactoryOptions.IDENTIFIER);
+        options.put(
+                JdbcCatalogFactoryOptions.DEFAULT_DATABASE.key(), 
PostgresCatalog.DEFAULT_DATABASE);
+        options.put(JdbcCatalogFactoryOptions.USERNAME.key(), TEST_USERNAME);
+        options.put(JdbcCatalogFactoryOptions.PASSWORD.key(), TEST_PWD);
+        options.put(JdbcCatalogFactoryOptions.BASE_URL.key(), baseUrl);
 
         final Catalog actualCatalog =
-                TableFactoryService.find(CatalogFactory.class, properties)
-                        .createCatalog(TEST_CATALOG_NAME, properties);
+                FactoryUtil.createCatalog(
+                        TEST_CATALOG_NAME,
+                        options,
+                        null,
+                        Thread.currentThread().getContextClassLoader());
 
         checkEquals(catalog, (JdbcCatalog) actualCatalog);
 
diff --git 
a/flink-connectors/flink-connector-jdbc/src/test/java/org/apache/flink/table/descriptors/JdbcCatalogDescriptorTest.java
 
b/flink-connectors/flink-connector-jdbc/src/test/java/org/apache/flink/table/descriptors/JdbcCatalogDescriptorTest.java
deleted file mode 100644
index 078e9d1..0000000
--- 
a/flink-connectors/flink-connector-jdbc/src/test/java/org/apache/flink/table/descriptors/JdbcCatalogDescriptorTest.java
+++ /dev/null
@@ -1,61 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.table.descriptors;
-
-import java.util.Arrays;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
-import static 
org.apache.flink.table.descriptors.JdbcCatalogValidator.CATALOG_TYPE_VALUE_JDBC;
-
-/** Test for {@link JdbcCatalogDescriptor}. */
-public class JdbcCatalogDescriptorTest extends DescriptorTestBase {
-
-    private static final String TEST_DB = "db";
-    private static final String TEST_USERNAME = "user";
-    private static final String TEST_PWD = "pwd";
-    private static final String TEST_BASE_URL = "xxx";
-
-    @Override
-    protected List<Descriptor> descriptors() {
-        final Descriptor descriptor =
-                new JdbcCatalogDescriptor(TEST_DB, TEST_USERNAME, TEST_PWD, 
TEST_BASE_URL);
-
-        return Arrays.asList(descriptor);
-    }
-
-    @Override
-    protected List<Map<String, String>> properties() {
-        final Map<String, String> props = new HashMap<>();
-        props.put("type", CATALOG_TYPE_VALUE_JDBC);
-        props.put("property-version", "1");
-        props.put("default-database", TEST_DB);
-        props.put("username", TEST_USERNAME);
-        props.put("password", TEST_PWD);
-        props.put("base-url", TEST_BASE_URL);
-
-        return Arrays.asList(props);
-    }
-
-    @Override
-    protected DescriptorValidator validator() {
-        return new JdbcCatalogValidator();
-    }
-}
diff --git 
a/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/config/entries/CatalogEntry.java
 
b/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/config/entries/CatalogEntry.java
index 99cf3ce..693f97a 100644
--- 
a/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/config/entries/CatalogEntry.java
+++ 
b/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/config/entries/CatalogEntry.java
@@ -18,15 +18,14 @@
 
 package org.apache.flink.table.client.config.entries;
 
+import org.apache.flink.table.catalog.CommonCatalogOptions;
 import org.apache.flink.table.client.config.ConfigUtil;
 import org.apache.flink.table.descriptors.DescriptorProperties;
+import org.apache.flink.table.factories.FactoryUtil;
 
 import java.util.Collections;
 import java.util.Map;
 
-import static 
org.apache.flink.table.descriptors.CatalogDescriptorValidator.CATALOG_PROPERTY_VERSION;
-import static 
org.apache.flink.table.descriptors.CatalogDescriptorValidator.CATALOG_TYPE;
-
 /**
  * Describes a catalog configuration entry.
  *
@@ -51,8 +50,8 @@ public class CatalogEntry extends ConfigEntry {
 
     @Override
     protected void validate(DescriptorProperties properties) {
-        properties.validateString(CATALOG_TYPE, false, 1);
-        properties.validateInt(CATALOG_PROPERTY_VERSION, true, 0);
+        properties.validateString(CommonCatalogOptions.CATALOG_TYPE.key(), 
false, 1);
+        properties.validateInt(FactoryUtil.PROPERTY_VERSION.key(), true, 0);
 
         // further validation is performed by the discovered factory
     }
diff --git 
a/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/context/ExecutionContextTest.java
 
b/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/context/ExecutionContextTest.java
index 79865a1..8c4fdfa 100644
--- 
a/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/context/ExecutionContextTest.java
+++ 
b/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/context/ExecutionContextTest.java
@@ -20,6 +20,7 @@ package org.apache.flink.table.client.gateway.context;
 
 import org.apache.flink.client.cli.DefaultCLI;
 import org.apache.flink.client.python.PythonFunctionFactory;
+import org.apache.flink.configuration.ConfigOption;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.CoreOptions;
 import org.apache.flink.configuration.PipelineOptions;
@@ -63,7 +64,6 @@ import java.util.Arrays;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
-import java.util.List;
 import java.util.Map;
 import java.util.Set;
 
@@ -493,20 +493,23 @@ public class ExecutionContextTest {
     public static class TestClassLoaderCatalogFactory implements 
CatalogFactory {
 
         @Override
-        public Catalog createCatalog(String name, Map<String, String> 
properties) {
-            return new TestClassLoaderCatalog("test_cl");
+        public String factoryIdentifier() {
+            return "test_cl_catalog";
         }
 
         @Override
-        public Map<String, String> requiredContext() {
-            Map<String, String> context = new HashMap<>();
-            context.put("type", "test_cl_catalog");
-            return context;
+        public Set<ConfigOption<?>> requiredOptions() {
+            return Collections.emptySet();
         }
 
         @Override
-        public List<String> supportedProperties() {
-            return Collections.emptyList();
+        public Set<ConfigOption<?>> optionalOptions() {
+            return Collections.emptySet();
+        }
+
+        @Override
+        public Catalog createCatalog(Context context) {
+            return new TestClassLoaderCatalog("test_cl");
         }
     }
 }
diff --git 
a/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/local/DependencyTest.java
 
b/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/local/DependencyTest.java
index 1613488..df582d3 100644
--- 
a/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/local/DependencyTest.java
+++ 
b/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/local/DependencyTest.java
@@ -19,6 +19,8 @@
 package org.apache.flink.table.client.gateway.local;
 
 import org.apache.flink.client.cli.DefaultCLI;
+import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.configuration.ConfigOptions;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.table.api.DataTypes;
 import org.apache.flink.table.api.Schema;
@@ -28,6 +30,7 @@ import org.apache.flink.table.catalog.CatalogDatabaseImpl;
 import org.apache.flink.table.catalog.CatalogPropertiesUtil;
 import org.apache.flink.table.catalog.CatalogTable;
 import org.apache.flink.table.catalog.Column;
+import org.apache.flink.table.catalog.CommonCatalogOptions;
 import org.apache.flink.table.catalog.GenericInMemoryCatalog;
 import org.apache.flink.table.catalog.ObjectPath;
 import org.apache.flink.table.catalog.ResolvedCatalogTable;
@@ -38,15 +41,14 @@ import 
org.apache.flink.table.catalog.exceptions.DatabaseNotExistException;
 import org.apache.flink.table.catalog.exceptions.TableAlreadyExistException;
 import org.apache.flink.table.catalog.hive.HiveCatalog;
 import org.apache.flink.table.catalog.hive.HiveTestUtils;
-import org.apache.flink.table.catalog.hive.descriptors.HiveCatalogValidator;
 import org.apache.flink.table.catalog.hive.factories.HiveCatalogFactory;
+import org.apache.flink.table.catalog.hive.factories.HiveCatalogFactoryOptions;
 import org.apache.flink.table.client.config.Environment;
 import org.apache.flink.table.client.gateway.context.DefaultContext;
 import org.apache.flink.table.client.gateway.utils.EnvironmentFileUtil;
 import org.apache.flink.table.client.gateway.utils.TestTableSinkFactoryBase;
 import org.apache.flink.table.client.gateway.utils.TestTableSourceFactoryBase;
 import org.apache.flink.table.delegation.Parser;
-import org.apache.flink.table.descriptors.DescriptorProperties;
 import org.apache.flink.table.factories.CatalogFactory;
 import org.apache.flink.table.factories.ModuleFactory;
 import org.apache.flink.table.module.Module;
@@ -63,14 +65,13 @@ import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
-import java.util.Optional;
+import java.util.Set;
 import java.util.stream.Collectors;
 import java.util.stream.IntStream;
 
-import static 
org.apache.flink.table.descriptors.CatalogDescriptorValidator.CATALOG_DEFAULT_DATABASE;
-import static 
org.apache.flink.table.descriptors.CatalogDescriptorValidator.CATALOG_TYPE;
 import static 
org.apache.flink.table.descriptors.ModuleDescriptorValidator.MODULE_TYPE;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
@@ -205,29 +206,32 @@ public class DependencyTest {
     /** Catalog that can be discovered if classloading is correct. */
     public static class TestCatalogFactory implements CatalogFactory {
 
+        private static final ConfigOption<String> DEFAULT_DATABASE =
+                ConfigOptions.key(CommonCatalogOptions.DEFAULT_DATABASE_KEY)
+                        .stringType()
+                        .defaultValue(GenericInMemoryCatalog.DEFAULT_DB);
+
         @Override
-        public Map<String, String> requiredContext() {
-            final Map<String, String> context = new HashMap<>();
-            context.put(CATALOG_TYPE, CATALOG_TYPE_TEST);
-            return context;
+        public String factoryIdentifier() {
+            return CATALOG_TYPE_TEST;
         }
 
         @Override
-        public List<String> supportedProperties() {
-            final List<String> properties = new ArrayList<>();
-            properties.add(CATALOG_DEFAULT_DATABASE);
-            return properties;
+        public Set<ConfigOption<?>> requiredOptions() {
+            return Collections.emptySet();
         }
 
         @Override
-        public Catalog createCatalog(String name, Map<String, String> 
properties) {
-            final DescriptorProperties params = new DescriptorProperties(true);
-            params.putProperties(properties);
-
-            final Optional<String> defaultDatabase =
-                    params.getOptionalString(CATALOG_DEFAULT_DATABASE);
+        public Set<ConfigOption<?>> optionalOptions() {
+            final Set<ConfigOption<?>> options = new HashSet<>();
+            options.add(DEFAULT_DATABASE);
+            return options;
+        }
 
-            return new TestCatalog(name, 
defaultDatabase.orElse(GenericInMemoryCatalog.DEFAULT_DB));
+        @Override
+        public Catalog createCatalog(Context context) {
+            final Configuration configuration = 
Configuration.fromMap(context.getOptions());
+            return new TestCatalog(context.getName(), 
configuration.getString(DEFAULT_DATABASE));
         }
     }
 
@@ -249,32 +253,22 @@ public class DependencyTest {
         static final String TABLE_WITH_PARAMETERIZED_TYPES = 
"param_types_table";
 
         @Override
-        public Map<String, String> requiredContext() {
-            Map<String, String> context = super.requiredContext();
-
-            // For factory discovery service to distinguish 
TestHiveCatalogFactory from
-            // HiveCatalogFactory
-            context.put("test", "test");
-            return context;
+        public String factoryIdentifier() {
+            return "hive-test";
         }
 
         @Override
-        public List<String> supportedProperties() {
-            List<String> list = super.supportedProperties();
-            list.add(CatalogPropertiesUtil.IS_GENERIC);
-
-            return list;
-        }
+        public Catalog createCatalog(Context context) {
+            final Configuration configuration = 
Configuration.fromMap(context.getOptions());
 
-        @Override
-        public Catalog createCatalog(String name, Map<String, String> 
properties) {
             // Developers may already have their own production/testing 
hive-site.xml set in their
             // environment,
             // and Flink tests should avoid using those hive-site.xml.
             // Thus, explicitly create a testing HiveConf for unit tests here
             Catalog hiveCatalog =
                     HiveTestUtils.createHiveCatalog(
-                            name, 
properties.get(HiveCatalogValidator.CATALOG_HIVE_VERSION));
+                            context.getName(),
+                            
configuration.getString(HiveCatalogFactoryOptions.HIVE_VERSION));
 
             // Creates an additional database to test tableEnv.useDatabase() 
will switch current
             // database of the catalog
diff --git 
a/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/local/EnvironmentTest.java
 
b/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/local/EnvironmentTest.java
index cf59d27..ea5ec57 100644
--- 
a/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/local/EnvironmentTest.java
+++ 
b/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/local/EnvironmentTest.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.table.client.gateway.local;
 
+import org.apache.flink.table.catalog.CommonCatalogOptions;
 import org.apache.flink.table.client.SqlClientException;
 import org.apache.flink.table.client.config.Environment;
 import org.apache.flink.table.client.gateway.utils.EnvironmentFileUtil;
@@ -35,7 +36,6 @@ import java.util.Set;
 
 import static 
org.apache.flink.table.client.config.entries.CatalogEntry.CATALOG_NAME;
 import static 
org.apache.flink.table.client.config.entries.ModuleEntry.MODULE_NAME;
-import static 
org.apache.flink.table.descriptors.CatalogDescriptorValidator.CATALOG_TYPE;
 import static 
org.apache.flink.table.descriptors.ModuleDescriptorValidator.MODULE_TYPE;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
@@ -130,7 +130,7 @@ public class EnvironmentTest {
         Map<String, Object> prop = new HashMap<>();
 
         prop.put(CATALOG_NAME, name);
-        prop.put(CATALOG_TYPE, type);
+        prop.put(CommonCatalogOptions.CATALOG_TYPE.key(), type);
 
         return prop;
     }
diff --git 
a/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/utils/SimpleCatalogFactory.java
 
b/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/utils/SimpleCatalogFactory.java
index ffad5ae..e24979c 100644
--- 
a/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/utils/SimpleCatalogFactory.java
+++ 
b/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/utils/SimpleCatalogFactory.java
@@ -20,16 +20,19 @@ package org.apache.flink.table.client.gateway.utils;
 
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.java.typeutils.RowTypeInfo;
+import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.configuration.ConfigOptions;
+import org.apache.flink.configuration.Configuration;
 import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.table.api.DataTypes;
 import org.apache.flink.table.api.TableSchema;
 import org.apache.flink.table.api.Types;
 import org.apache.flink.table.catalog.Catalog;
+import org.apache.flink.table.catalog.CommonCatalogOptions;
 import org.apache.flink.table.catalog.ConnectorCatalogTable;
 import org.apache.flink.table.catalog.GenericInMemoryCatalog;
 import org.apache.flink.table.catalog.ObjectPath;
-import org.apache.flink.table.descriptors.CatalogDescriptorValidator;
 import org.apache.flink.table.factories.CatalogFactory;
 import org.apache.flink.table.sources.StreamTableSource;
 import org.apache.flink.table.types.DataType;
@@ -37,9 +40,10 @@ import org.apache.flink.types.Row;
 import org.apache.flink.util.WrappingRuntimeException;
 
 import java.util.Arrays;
-import java.util.HashMap;
+import java.util.Collections;
+import java.util.HashSet;
 import java.util.List;
-import java.util.Map;
+import java.util.Set;
 
 /**
  * Catalog factory for an in-memory catalog that contains a single non-empty 
table. The contents of
@@ -47,22 +51,46 @@ import java.util.Map;
  */
 public class SimpleCatalogFactory implements CatalogFactory {
 
-    public static final String CATALOG_TYPE_VALUE = "simple-catalog";
-
-    public static final String TEST_TABLE_NAME = "test-table";
+    public static final String IDENTIFIER = "simple-catalog";
 
     public static final List<Row> TABLE_CONTENTS =
             Arrays.asList(
                     Row.of(1, "Hello"), Row.of(2, "Hello world"), Row.of(3, 
"Hello world! Hello!"));
 
+    private static final ConfigOption<String> DEFAULT_DATABASE =
+            ConfigOptions.key(CommonCatalogOptions.DEFAULT_DATABASE_KEY)
+                    .stringType()
+                    .defaultValue("default_database");
+
+    private static final ConfigOption<String> TABLE_NAME =
+            
ConfigOptions.key("test-table").stringType().defaultValue("test-table");
+
+    @Override
+    public String factoryIdentifier() {
+        return IDENTIFIER;
+    }
+
+    @Override
+    public Set<ConfigOption<?>> requiredOptions() {
+        return Collections.emptySet();
+    }
+
     @Override
-    public Catalog createCatalog(String name, Map<String, String> properties) {
-        String database =
-                properties.getOrDefault(
-                        CatalogDescriptorValidator.CATALOG_DEFAULT_DATABASE, 
"default_database");
-        GenericInMemoryCatalog genericInMemoryCatalog = new 
GenericInMemoryCatalog(name, database);
+    public Set<ConfigOption<?>> optionalOptions() {
+        final Set<ConfigOption<?>> options = new HashSet<>();
+        options.add(DEFAULT_DATABASE);
+        options.add(TABLE_NAME);
+        return options;
+    }
+
+    @Override
+    public Catalog createCatalog(Context context) {
+        final Configuration configuration = 
Configuration.fromMap(context.getOptions());
+        final String database = configuration.getString(DEFAULT_DATABASE);
+        final String tableName = configuration.getString(TABLE_NAME);
 
-        String tableName = properties.getOrDefault(TEST_TABLE_NAME, 
TEST_TABLE_NAME);
+        final GenericInMemoryCatalog genericInMemoryCatalog =
+                new GenericInMemoryCatalog(context.getName(), database);
         StreamTableSource<Row> tableSource =
                 new StreamTableSource<Row>() {
                     @Override
@@ -102,16 +130,4 @@ public class SimpleCatalogFactory implements 
CatalogFactory {
 
         return genericInMemoryCatalog;
     }
-
-    @Override
-    public Map<String, String> requiredContext() {
-        Map<String, String> context = new HashMap<>();
-        context.put(CatalogDescriptorValidator.CATALOG_TYPE, 
CATALOG_TYPE_VALUE);
-        return context;
-    }
-
-    @Override
-    public List<String> supportedProperties() {
-        return 
Arrays.asList(CatalogDescriptorValidator.CATALOG_DEFAULT_DATABASE, 
TEST_TABLE_NAME);
-    }
 }
diff --git 
a/flink-table/flink-sql-client/src/test/resources/META-INF/services/org.apache.flink.table.factories.TableFactory
 
b/flink-table/flink-sql-client/src/test/resources/META-INF/services/org.apache.flink.table.factories.Factory
similarity index 83%
copy from 
flink-table/flink-sql-client/src/test/resources/META-INF/services/org.apache.flink.table.factories.TableFactory
copy to 
flink-table/flink-sql-client/src/test/resources/META-INF/services/org.apache.flink.table.factories.Factory
index a0d1c61..d033f0a 100644
--- 
a/flink-table/flink-sql-client/src/test/resources/META-INF/services/org.apache.flink.table.factories.TableFactory
+++ 
b/flink-table/flink-sql-client/src/test/resources/META-INF/services/org.apache.flink.table.factories.Factory
@@ -13,10 +13,7 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
-org.apache.flink.table.client.gateway.utils.DummyTableSinkFactory
-org.apache.flink.table.client.gateway.utils.DummyTableSourceFactory
 org.apache.flink.table.client.gateway.utils.SimpleCatalogFactory
-org.apache.flink.table.client.gateway.local.DependencyTest$TestCatalogFactory
 
org.apache.flink.table.client.gateway.local.DependencyTest$TestHiveCatalogFactory
-org.apache.flink.table.client.gateway.local.DependencyTest$TestModuleFactory
+org.apache.flink.table.client.gateway.local.DependencyTest$TestCatalogFactory
 
org.apache.flink.table.client.gateway.context.ExecutionContextTest$TestClassLoaderCatalogFactory
diff --git 
a/flink-table/flink-sql-client/src/test/resources/META-INF/services/org.apache.flink.table.factories.TableFactory
 
b/flink-table/flink-sql-client/src/test/resources/META-INF/services/org.apache.flink.table.factories.TableFactory
index a0d1c61..2c2b86b 100644
--- 
a/flink-table/flink-sql-client/src/test/resources/META-INF/services/org.apache.flink.table.factories.TableFactory
+++ 
b/flink-table/flink-sql-client/src/test/resources/META-INF/services/org.apache.flink.table.factories.TableFactory
@@ -15,8 +15,4 @@
 
 org.apache.flink.table.client.gateway.utils.DummyTableSinkFactory
 org.apache.flink.table.client.gateway.utils.DummyTableSourceFactory
-org.apache.flink.table.client.gateway.utils.SimpleCatalogFactory
-org.apache.flink.table.client.gateway.local.DependencyTest$TestCatalogFactory
-org.apache.flink.table.client.gateway.local.DependencyTest$TestHiveCatalogFactory
 org.apache.flink.table.client.gateway.local.DependencyTest$TestModuleFactory
-org.apache.flink.table.client.gateway.context.ExecutionContextTest$TestClassLoaderCatalogFactory
diff --git 
a/flink-table/flink-sql-client/src/test/resources/sql/catalog_database.q 
b/flink-table/flink-sql-client/src/test/resources/sql/catalog_database.q
index 633e5c1..1d291c5 100644
--- a/flink-table/flink-sql-client/src/test/resources/sql/catalog_database.q
+++ b/flink-table/flink-sql-client/src/test/resources/sql/catalog_database.q
@@ -191,8 +191,7 @@ drop catalog `mod`;
 # ==========================================================================
 
 create catalog hivecatalog with (
- 'type' = 'hive',
- 'test' = 'test',  -- this makes sure we use TestHiveCatalogFactory
+ 'type' = 'hive-test',
  'hive-version' = '2.3.4'
 );
 [INFO] Catalog has been created.
diff --git a/flink-table/flink-sql-client/src/test/resources/sql/function.q 
b/flink-table/flink-sql-client/src/test/resources/sql/function.q
index fea362b..ddbf1ba 100644
--- a/flink-table/flink-sql-client/src/test/resources/sql/function.q
+++ b/flink-table/flink-sql-client/src/test/resources/sql/function.q
@@ -205,7 +205,7 @@ org.apache.flink.table.api.ValidationException: Alter 
temporary catalog function
 # test function with hive catalog
 # ==========================================================================
 
-create catalog hivecatalog with ('type'='hive', 
'hive-version'='2.3.4','test'='test');
+create catalog hivecatalog with ('type'='hive-test', 'hive-version'='2.3.4');
 [INFO] Catalog has been created.
 !info
 
diff --git 
a/flink-table/flink-sql-client/src/test/resources/test-sql-client-catalogs.yaml 
b/flink-table/flink-sql-client/src/test/resources/test-sql-client-catalogs.yaml
index c88d06b..51eb80f 100644
--- 
a/flink-table/flink-sql-client/src/test/resources/test-sql-client-catalogs.yaml
+++ 
b/flink-table/flink-sql-client/src/test/resources/test-sql-client-catalogs.yaml
@@ -150,9 +150,7 @@ catalogs:
     type: generic_in_memory
     default-database: mydatabase
   - name: hivecatalog
-    type: hive
-    test: test
+    type: hive-test
     hive-version: 2.3.4
   - name: hivedefaultversion
-    type: hive
-    test: test
+    type: hive-test
diff --git 
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/GenericInMemoryCatalogFactory.java
 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/GenericInMemoryCatalogFactory.java
index 27c7af5..6ee2c14 100644
--- 
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/GenericInMemoryCatalogFactory.java
+++ 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/GenericInMemoryCatalogFactory.java
@@ -18,59 +18,47 @@
 
 package org.apache.flink.table.catalog;
 
-import org.apache.flink.table.descriptors.DescriptorProperties;
-import org.apache.flink.table.descriptors.GenericInMemoryCatalogValidator;
+import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.table.api.ValidationException;
 import org.apache.flink.table.factories.CatalogFactory;
 
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Optional;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Set;
 
-import static 
org.apache.flink.table.descriptors.CatalogDescriptorValidator.CATALOG_DEFAULT_DATABASE;
-import static 
org.apache.flink.table.descriptors.CatalogDescriptorValidator.CATALOG_PROPERTY_VERSION;
-import static 
org.apache.flink.table.descriptors.CatalogDescriptorValidator.CATALOG_TYPE;
-import static 
org.apache.flink.table.descriptors.GenericInMemoryCatalogValidator.CATALOG_TYPE_VALUE_GENERIC_IN_MEMORY;
+import static 
org.apache.flink.table.catalog.GenericInMemoryCatalogFactoryOptions.DEFAULT_DATABASE;
+import static org.apache.flink.table.factories.FactoryUtil.PROPERTY_VERSION;
 
 /** Catalog factory for {@link GenericInMemoryCatalog}. */
 public class GenericInMemoryCatalogFactory implements CatalogFactory {
 
     @Override
-    public Map<String, String> requiredContext() {
-        Map<String, String> context = new HashMap<>();
-        context.put(CATALOG_TYPE, CATALOG_TYPE_VALUE_GENERIC_IN_MEMORY); // 
generic_in_memory
-        context.put(CATALOG_PROPERTY_VERSION, "1"); // backwards compatibility
-        return context;
+    public String factoryIdentifier() {
+        return GenericInMemoryCatalogFactoryOptions.IDENTIFIER;
     }
 
     @Override
-    public List<String> supportedProperties() {
-        List<String> properties = new ArrayList<>();
-
-        // default database
-        properties.add(CATALOG_DEFAULT_DATABASE);
-
-        return properties;
+    public Set<ConfigOption<?>> requiredOptions() {
+        return Collections.emptySet();
     }
 
     @Override
-    public Catalog createCatalog(String name, Map<String, String> properties) {
-        final DescriptorProperties descriptorProperties = 
getValidatedProperties(properties);
-
-        final Optional<String> defaultDatabase =
-                
descriptorProperties.getOptionalString(CATALOG_DEFAULT_DATABASE);
-
-        return new GenericInMemoryCatalog(
-                name, 
defaultDatabase.orElse(GenericInMemoryCatalog.DEFAULT_DB));
+    public Set<ConfigOption<?>> optionalOptions() {
+        final Set<ConfigOption<?>> options = new HashSet<>();
+        options.add(DEFAULT_DATABASE);
+        options.add(PROPERTY_VERSION);
+        return options;
     }
 
-    private static DescriptorProperties getValidatedProperties(Map<String, 
String> properties) {
-        final DescriptorProperties descriptorProperties = new 
DescriptorProperties(true);
-        descriptorProperties.putProperties(properties);
-
-        new GenericInMemoryCatalogValidator().validate(descriptorProperties);
+    @Override
+    public Catalog createCatalog(Context context) {
+        final Configuration configuration = 
Configuration.fromMap(context.getOptions());
+        final String defaultDatabase = 
configuration.getString(DEFAULT_DATABASE);
+        if (defaultDatabase == null || defaultDatabase.isEmpty()) {
+            throw new ValidationException("The default database must not be 
empty");
+        }
 
-        return descriptorProperties;
+        return new GenericInMemoryCatalog(context.getName(), defaultDatabase);
     }
 }
diff --git 
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/descriptors/GenericInMemoryCatalogValidator.java
 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/GenericInMemoryCatalogFactoryOptions.java
similarity index 54%
rename from 
flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/descriptors/GenericInMemoryCatalogValidator.java
rename to 
flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/GenericInMemoryCatalogFactoryOptions.java
index 61a5a1a..f8fd4f6 100644
--- 
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/descriptors/GenericInMemoryCatalogValidator.java
+++ 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/GenericInMemoryCatalogFactoryOptions.java
@@ -16,15 +16,22 @@
  * limitations under the License.
  */
 
-package org.apache.flink.table.descriptors;
+package org.apache.flink.table.catalog;
 
-/** Validator for {@link GenericInMemoryCatalogDescriptor}. */
-public class GenericInMemoryCatalogValidator extends 
CatalogDescriptorValidator {
-    public static final String CATALOG_TYPE_VALUE_GENERIC_IN_MEMORY = 
"generic_in_memory";
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.configuration.ConfigOptions;
 
-    @Override
-    public void validate(DescriptorProperties properties) {
-        super.validate(properties);
-        properties.validateValue(CATALOG_TYPE, 
CATALOG_TYPE_VALUE_GENERIC_IN_MEMORY, false);
-    }
+/** {@link ConfigOption}s for {@link GenericInMemoryCatalog}. */
+@Internal
+public class GenericInMemoryCatalogFactoryOptions {
+
+    public static final String IDENTIFIER = "generic_in_memory";
+
+    public static final ConfigOption<String> DEFAULT_DATABASE =
+            ConfigOptions.key(CommonCatalogOptions.DEFAULT_DATABASE_KEY)
+                    .stringType()
+                    .defaultValue(GenericInMemoryCatalog.DEFAULT_DB);
+
+    private GenericInMemoryCatalogFactoryOptions() {}
 }
diff --git 
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/descriptors/GenericInMemoryCatalogDescriptor.java
 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/descriptors/GenericInMemoryCatalogDescriptor.java
deleted file mode 100644
index 7970a91..0000000
--- 
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/descriptors/GenericInMemoryCatalogDescriptor.java
+++ /dev/null
@@ -1,42 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.table.descriptors;
-
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.Map;
-
-import static 
org.apache.flink.table.descriptors.GenericInMemoryCatalogValidator.CATALOG_TYPE_VALUE_GENERIC_IN_MEMORY;
-
-/** Catalog descriptor for the generic in memory catalog. */
-public class GenericInMemoryCatalogDescriptor extends CatalogDescriptor {
-
-    public GenericInMemoryCatalogDescriptor() {
-        super(CATALOG_TYPE_VALUE_GENERIC_IN_MEMORY, 1);
-    }
-
-    public GenericInMemoryCatalogDescriptor(String defaultDatabase) {
-        super(CATALOG_TYPE_VALUE_GENERIC_IN_MEMORY, 1, defaultDatabase);
-    }
-
-    @Override
-    protected Map<String, String> toCatalogProperties() {
-        return Collections.unmodifiableMap(new HashMap<>());
-    }
-}
diff --git 
a/flink-table/flink-table-api-java/src/main/resources/META-INF/services/org.apache.flink.table.factories.TableFactory
 
b/flink-table/flink-table-api-java/src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory
similarity index 100%
rename from 
flink-table/flink-table-api-java/src/main/resources/META-INF/services/org.apache.flink.table.factories.TableFactory
rename to 
flink-table/flink-table-api-java/src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory
diff --git 
a/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/catalog/GenericInMemoryCatalogFactoryTest.java
 
b/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/catalog/GenericInMemoryCatalogFactoryTest.java
index bfca122..1184936 100644
--- 
a/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/catalog/GenericInMemoryCatalogFactoryTest.java
+++ 
b/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/catalog/GenericInMemoryCatalogFactoryTest.java
@@ -18,14 +18,12 @@
 
 package org.apache.flink.table.catalog;
 
-import org.apache.flink.table.descriptors.CatalogDescriptor;
-import org.apache.flink.table.descriptors.GenericInMemoryCatalogDescriptor;
-import org.apache.flink.table.factories.CatalogFactory;
-import org.apache.flink.table.factories.TableFactoryService;
+import org.apache.flink.table.factories.FactoryUtil;
 import org.apache.flink.util.TestLogger;
 
 import org.junit.Test;
 
+import java.util.HashMap;
 import java.util.Map;
 
 import static org.junit.Assert.assertEquals;
@@ -41,14 +39,15 @@ public class GenericInMemoryCatalogFactoryTest extends 
TestLogger {
         final GenericInMemoryCatalog expectedCatalog =
                 new GenericInMemoryCatalog(catalogName, databaseName);
 
-        final CatalogDescriptor catalogDescriptor =
-                new GenericInMemoryCatalogDescriptor(databaseName);
-
-        final Map<String, String> properties = 
catalogDescriptor.toProperties();
+        final Map<String, String> options = new HashMap<>();
+        options.put(
+                CommonCatalogOptions.CATALOG_TYPE.key(),
+                GenericInMemoryCatalogFactoryOptions.IDENTIFIER);
+        
options.put(GenericInMemoryCatalogFactoryOptions.DEFAULT_DATABASE.key(), 
databaseName);
 
         final Catalog actualCatalog =
-                TableFactoryService.find(CatalogFactory.class, properties)
-                        .createCatalog(catalogName, properties);
+                FactoryUtil.createCatalog(
+                        catalogName, options, null, 
Thread.currentThread().getContextClassLoader());
 
         checkEquals(expectedCatalog, (GenericInMemoryCatalog) actualCatalog);
     }
diff --git 
a/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/descriptor/GenericInMemoryCatalogDescriptorTest.java
 
b/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/descriptor/GenericInMemoryCatalogDescriptorTest.java
deleted file mode 100644
index a71c363..0000000
--- 
a/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/descriptor/GenericInMemoryCatalogDescriptorTest.java
+++ /dev/null
@@ -1,64 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.table.descriptor;
-
-import org.apache.flink.table.descriptors.Descriptor;
-import org.apache.flink.table.descriptors.DescriptorTestBase;
-import org.apache.flink.table.descriptors.DescriptorValidator;
-import org.apache.flink.table.descriptors.GenericInMemoryCatalogDescriptor;
-import org.apache.flink.table.descriptors.GenericInMemoryCatalogValidator;
-
-import java.util.Arrays;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
-/** Tests for the {@link GenericInMemoryCatalogDescriptor} descriptor. */
-public class GenericInMemoryCatalogDescriptorTest extends DescriptorTestBase {
-
-    private static final String TEST_DATABASE = "test";
-
-    @Override
-    protected List<Descriptor> descriptors() {
-        final Descriptor withoutDefaultDB = new 
GenericInMemoryCatalogDescriptor();
-
-        final Descriptor withDefaultDB = new 
GenericInMemoryCatalogDescriptor(TEST_DATABASE);
-
-        return Arrays.asList(withoutDefaultDB, withDefaultDB);
-    }
-
-    @Override
-    protected List<Map<String, String>> properties() {
-        final Map<String, String> props1 = new HashMap<>();
-        props1.put("type", "generic_in_memory");
-        props1.put("property-version", "1");
-
-        final Map<String, String> props2 = new HashMap<>();
-        props2.put("type", "generic_in_memory");
-        props2.put("property-version", "1");
-        props2.put("default-database", TEST_DATABASE);
-
-        return Arrays.asList(props1, props2);
-    }
-
-    @Override
-    protected DescriptorValidator validator() {
-        return new GenericInMemoryCatalogValidator();
-    }
-}
diff --git 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/CommonCatalogOptions.java
 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/CommonCatalogOptions.java
new file mode 100644
index 0000000..8641ff9
--- /dev/null
+++ 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/CommonCatalogOptions.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.catalog;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.configuration.ConfigOptions;
+import org.apache.flink.table.factories.Factory;
+
+/** A collection of {@link ConfigOption} which are consistently used in 
multiple catalogs. */
+@Internal
+public class CommonCatalogOptions {
+
+    /**
+     * Key used for specifying a default database {@link ConfigOption}.
+     *
+     * <p>Note that we cannot expose an actual instance of {@link 
ConfigOption} here as the default
+     * values differ between catalogs.
+     */
+    public static final String DEFAULT_DATABASE_KEY = "default-database";
+
+    /**
+     * {@link ConfigOption} which is used during catalog discovery to match it 
against {@link
+     * Factory#factoryIdentifier()}.
+     */
+    public static final ConfigOption<String> CATALOG_TYPE =
+            ConfigOptions.key("type").stringType().noDefaultValue();
+}
diff --git 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/descriptors/CatalogDescriptor.java
 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/descriptors/CatalogDescriptor.java
deleted file mode 100644
index 112c70e..0000000
--- 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/descriptors/CatalogDescriptor.java
+++ /dev/null
@@ -1,86 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.table.descriptors;
-
-import org.apache.flink.annotation.PublicEvolving;
-import org.apache.flink.util.StringUtils;
-
-import java.util.Map;
-
-import static 
org.apache.flink.table.descriptors.CatalogDescriptorValidator.CATALOG_DEFAULT_DATABASE;
-import static 
org.apache.flink.table.descriptors.CatalogDescriptorValidator.CATALOG_PROPERTY_VERSION;
-import static 
org.apache.flink.table.descriptors.CatalogDescriptorValidator.CATALOG_TYPE;
-import static org.apache.flink.util.Preconditions.checkArgument;
-
-/** Describes a catalog of tables, views, and functions. */
-@PublicEvolving
-public abstract class CatalogDescriptor extends DescriptorBase {
-
-    private final String type;
-
-    private final int propertyVersion;
-
-    private final String defaultDatabase;
-
-    /**
-     * Constructs a {@link CatalogDescriptor}.
-     *
-     * @param type string that identifies this catalog
-     * @param propertyVersion property version for backwards compatibility
-     */
-    public CatalogDescriptor(String type, int propertyVersion) {
-        this(type, propertyVersion, null);
-    }
-
-    /**
-     * Constructs a {@link CatalogDescriptor}.
-     *
-     * @param type string that identifies this catalog
-     * @param propertyVersion property version for backwards compatibility
-     * @param defaultDatabase default database of the catalog
-     */
-    public CatalogDescriptor(String type, int propertyVersion, String 
defaultDatabase) {
-        checkArgument(!StringUtils.isNullOrWhitespaceOnly(type), "type cannot 
be null or empty");
-
-        this.type = type;
-        this.propertyVersion = propertyVersion;
-        this.defaultDatabase = defaultDatabase;
-    }
-
-    @Override
-    public final Map<String, String> toProperties() {
-        final DescriptorProperties properties = new DescriptorProperties();
-        properties.putString(CATALOG_TYPE, type);
-        properties.putLong(CATALOG_PROPERTY_VERSION, propertyVersion);
-
-        if (defaultDatabase != null) {
-            properties.putString(CATALOG_DEFAULT_DATABASE, defaultDatabase);
-        }
-
-        properties.putProperties(toCatalogProperties());
-        return properties.asMap();
-    }
-
-    public String getDefaultDatabase() {
-        return defaultDatabase;
-    }
-
-    /** Converts this descriptor into a set of catalog properties. */
-    protected abstract Map<String, String> toCatalogProperties();
-}
diff --git 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/descriptors/CatalogDescriptorValidator.java
 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/descriptors/CatalogDescriptorValidator.java
deleted file mode 100644
index 384c7bc..0000000
--- 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/descriptors/CatalogDescriptorValidator.java
+++ /dev/null
@@ -1,45 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.table.descriptors;
-
-import org.apache.flink.annotation.Internal;
-
-/** Validator for {@link CatalogDescriptor}. */
-@Internal
-public abstract class CatalogDescriptorValidator implements 
DescriptorValidator {
-
-    /** Key for describing the type of the catalog. Usually used for factory 
discovery.ca */
-    public static final String CATALOG_TYPE = "type";
-
-    /**
-     * Key for describing the property version. This property can be used for 
backwards
-     * compatibility in case the property format changes.
-     */
-    public static final String CATALOG_PROPERTY_VERSION = "property-version";
-
-    /** Key for describing the default database of the catalog. */
-    public static final String CATALOG_DEFAULT_DATABASE = "default-database";
-
-    @Override
-    public void validate(DescriptorProperties properties) {
-        properties.validateString(CATALOG_TYPE, false, 1);
-        properties.validateInt(CATALOG_PROPERTY_VERSION, true, 0);
-        properties.validateString(CATALOG_DEFAULT_DATABASE, true, 1);
-    }
-}
diff --git 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/factories/FactoryUtil.java
 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/factories/FactoryUtil.java
index c78d086..413193d 100644
--- 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/factories/FactoryUtil.java
+++ 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/factories/FactoryUtil.java
@@ -29,6 +29,7 @@ import org.apache.flink.table.api.TableException;
 import org.apache.flink.table.api.ValidationException;
 import org.apache.flink.table.catalog.Catalog;
 import org.apache.flink.table.catalog.CatalogTable;
+import org.apache.flink.table.catalog.CommonCatalogOptions;
 import org.apache.flink.table.catalog.ObjectIdentifier;
 import org.apache.flink.table.catalog.ResolvedCatalogTable;
 import org.apache.flink.table.connector.format.DecodingFormat;
@@ -60,6 +61,10 @@ public final class FactoryUtil {
 
     private static final Logger LOG = 
LoggerFactory.getLogger(FactoryUtil.class);
 
+    /**
+     * Describes the property version. This can be used for backwards 
compatibility in case the
+     * property format changes.
+     */
     public static final ConfigOption<Integer> PROPERTY_VERSION =
             ConfigOptions.key("property-version")
                     .intType()
@@ -75,13 +80,6 @@ public final class FactoryUtil {
                             "Uniquely identifies the connector of a dynamic 
table that is used for accessing data in "
                                     + "an external system. Its value is used 
during table source and table sink discovery.");
 
-    public static final ConfigOption<String> CATALOG_TYPE =
-            ConfigOptions.key("type")
-                    .stringType()
-                    .noDefaultValue()
-                    .withDescription(
-                            "Uniquely identifies the catalog. Its value is 
used during catalog discovery.");
-
     public static final ConfigOption<String> FORMAT =
             ConfigOptions.key("format")
                     .stringType()
@@ -239,7 +237,11 @@ public final class FactoryUtil {
                 // to the catalog factory itself.
                 final Map<String, String> factoryOptions =
                         options.entrySet().stream()
-                                .filter(entry -> 
!CATALOG_TYPE.key().equals(entry.getKey()))
+                                .filter(
+                                        entry ->
+                                                
!CommonCatalogOptions.CATALOG_TYPE
+                                                        .key()
+                                                        
.equals(entry.getKey()))
                                 .collect(Collectors.toMap(Map.Entry::getKey, 
Map.Entry::getValue));
                 final DefaultCatalogContext context =
                         new DefaultCatalogContext(
@@ -258,7 +260,8 @@ public final class FactoryUtil {
                                                                 
optionEntry.getKey(),
                                                                 
optionEntry.getValue()))
                                         .sorted()
-                                        .collect(Collectors.joining("\n"))));
+                                        .collect(Collectors.joining("\n"))),
+                        t);
             }
         }
     }
@@ -420,12 +423,13 @@ public final class FactoryUtil {
     }
 
     private static CatalogFactory getCatalogFactory(CatalogFactory.Context 
context) {
-        final String catalogType = 
context.getOptions().get(CATALOG_TYPE.key());
+        final String catalogType =
+                
context.getOptions().get(CommonCatalogOptions.CATALOG_TYPE.key());
         if (catalogType == null) {
             throw new ValidationException(
                     String.format(
                             "Catalog options do not contain an option key '%s' 
for discovering a catalog.",
-                            CATALOG_TYPE.key()));
+                            CommonCatalogOptions.CATALOG_TYPE.key()));
         }
 
         return discoverFactory(context.getClassLoader(), CatalogFactory.class, 
catalogType);
diff --git 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/factories/TableFactoryService.java
 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/factories/TableFactoryService.java
index 51ea695..9bfae3e 100644
--- 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/factories/TableFactoryService.java
+++ 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/factories/TableFactoryService.java
@@ -42,7 +42,6 @@ import java.util.ServiceLoader;
 import java.util.Set;
 import java.util.stream.Collectors;
 
-import static 
org.apache.flink.table.descriptors.CatalogDescriptorValidator.CATALOG_PROPERTY_VERSION;
 import static 
org.apache.flink.table.descriptors.ConnectorDescriptorValidator.CONNECTOR_PROPERTY_VERSION;
 import static 
org.apache.flink.table.descriptors.FormatDescriptorValidator.FORMAT_PROPERTY_VERSION;
 
@@ -240,7 +239,7 @@ public class TableFactoryService {
             // with the version we can provide mappings in case the format 
changes
             plainContext.remove(CONNECTOR_PROPERTY_VERSION);
             plainContext.remove(FORMAT_PROPERTY_VERSION);
-            plainContext.remove(CATALOG_PROPERTY_VERSION);
+            plainContext.remove(FactoryUtil.PROPERTY_VERSION.key());
 
             // check if required context is met
             Map<String, Tuple2<String, String>> mismatchedProperties = new 
HashMap<>();
diff --git 
a/flink-table/flink-table-common/src/test/java/org/apache/flink/table/descriptors/CatalogDescriptorTest.java
 
b/flink-table/flink-table-common/src/test/java/org/apache/flink/table/descriptors/CatalogDescriptorTest.java
deleted file mode 100644
index 1f7ff63..0000000
--- 
a/flink-table/flink-table-common/src/test/java/org/apache/flink/table/descriptors/CatalogDescriptorTest.java
+++ /dev/null
@@ -1,103 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.table.descriptors;
-
-import org.apache.flink.table.api.ValidationException;
-
-import org.junit.Test;
-
-import javax.annotation.Nullable;
-
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
-import static 
org.apache.flink.table.descriptors.CatalogDescriptorValidator.CATALOG_PROPERTY_VERSION;
-import static 
org.apache.flink.table.descriptors.CatalogDescriptorValidator.CATALOG_TYPE;
-
-/**
- * Tests for the {@link CatalogDescriptor} descriptor and {@link 
CatalogDescriptorValidator}
- * validator.
- */
-public class CatalogDescriptorTest extends DescriptorTestBase {
-
-    private static final String CATALOG_TYPE_VALUE = "CatalogDescriptorTest";
-    private static final int CATALOG_PROPERTY_VERSION_VALUE = 1;
-    private static final String CATALOG_FOO = "foo";
-    private static final String CATALOG_FOO_VALUE = "foo-1";
-
-    @Test(expected = ValidationException.class)
-    public void testMissingCatalogType() {
-        removePropertyAndVerify(descriptors().get(0), CATALOG_TYPE);
-    }
-
-    @Test(expected = ValidationException.class)
-    public void testMissingFoo() {
-        removePropertyAndVerify(descriptors().get(0), CATALOG_FOO);
-    }
-
-    @Override
-    protected List<Descriptor> descriptors() {
-        final Descriptor minimumDesc = new 
TestCatalogDescriptor(CATALOG_FOO_VALUE);
-        return Collections.singletonList(minimumDesc);
-    }
-
-    @Override
-    protected List<Map<String, String>> properties() {
-        final Map<String, String> minimumProps = new HashMap<>();
-        minimumProps.put(CATALOG_TYPE, CATALOG_TYPE_VALUE);
-        minimumProps.put(CATALOG_PROPERTY_VERSION, "" + 
CATALOG_PROPERTY_VERSION_VALUE);
-        minimumProps.put(CATALOG_FOO, CATALOG_FOO_VALUE);
-        return Collections.singletonList(minimumProps);
-    }
-
-    @Override
-    protected DescriptorValidator validator() {
-        return new TestCatalogDescriptorValidator();
-    }
-
-    /** CatalogDescriptor for test. */
-    private class TestCatalogDescriptor extends CatalogDescriptor {
-        private String foo;
-
-        public TestCatalogDescriptor(@Nullable String foo) {
-            super(CATALOG_TYPE_VALUE, CATALOG_PROPERTY_VERSION_VALUE);
-            this.foo = foo;
-        }
-
-        @Override
-        protected Map<String, String> toCatalogProperties() {
-            DescriptorProperties properties = new DescriptorProperties();
-            if (foo != null) {
-                properties.putString(CATALOG_FOO, foo);
-            }
-            return properties.asMap();
-        }
-    }
-
-    /** CatalogDescriptorValidator for test. */
-    private class TestCatalogDescriptorValidator extends 
CatalogDescriptorValidator {
-        @Override
-        public void validate(DescriptorProperties properties) {
-            super.validate(properties);
-            properties.validateString(CATALOG_FOO, false, 1);
-        }
-    }
-}
diff --git 
a/flink-table/flink-table-common/src/test/java/org/apache/flink/table/factories/FactoryUtilTest.java
 
b/flink-table/flink-table-common/src/test/java/org/apache/flink/table/factories/FactoryUtilTest.java
index bb478f6..7b04ea0 100644
--- 
a/flink-table/flink-table-common/src/test/java/org/apache/flink/table/factories/FactoryUtilTest.java
+++ 
b/flink-table/flink-table-common/src/test/java/org/apache/flink/table/factories/FactoryUtilTest.java
@@ -20,8 +20,10 @@ package org.apache.flink.table.factories;
 
 import org.apache.flink.table.api.TableSchema;
 import org.apache.flink.table.api.ValidationException;
+import org.apache.flink.table.catalog.Catalog;
 import org.apache.flink.table.catalog.CatalogBaseTable;
 import org.apache.flink.table.catalog.CatalogTable;
+import org.apache.flink.table.catalog.CommonCatalogOptions;
 import org.apache.flink.table.connector.sink.DynamicTableSink;
 import org.apache.flink.table.connector.source.DynamicTableSource;
 import 
org.apache.flink.table.factories.TestDynamicTableFactory.DynamicTableSinkMock;
@@ -46,6 +48,7 @@ import static 
org.apache.flink.table.factories.utils.FactoryMocks.createTableSin
 import static 
org.apache.flink.table.factories.utils.FactoryMocks.createTableSource;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertThat;
+import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 
 /** Tests for {@link FactoryUtil}. */
@@ -273,6 +276,27 @@ public class FactoryUtilTest {
         }
     }
 
+    @Test
+    public void testCreateCatalog() {
+        final Map<String, String> options = new HashMap<>();
+        options.put(CommonCatalogOptions.CATALOG_TYPE.key(), 
TestCatalogFactory.IDENTIFIER);
+        options.put(TestCatalogFactory.DEFAULT_DATABASE.key(), "my-database");
+
+        final Catalog catalog =
+                FactoryUtil.createCatalog(
+                        "my-catalog",
+                        options,
+                        null,
+                        Thread.currentThread().getContextClassLoader());
+        assertTrue(catalog instanceof TestCatalogFactory.TestCatalog);
+
+        final TestCatalogFactory.TestCatalog testCatalog = 
(TestCatalogFactory.TestCatalog) catalog;
+        assertEquals(testCatalog.getName(), "my-catalog");
+        assertEquals(
+                
testCatalog.getOptions().get(TestCatalogFactory.DEFAULT_DATABASE.key()),
+                "my-database");
+    }
+
     // 
--------------------------------------------------------------------------------------------
 
     private void expectError(String message) {
diff --git 
a/flink-table/flink-table-common/src/test/java/org/apache/flink/table/factories/TestCatalogFactory.java
 
b/flink-table/flink-table-common/src/test/java/org/apache/flink/table/factories/TestCatalogFactory.java
new file mode 100644
index 0000000..ee9cad5
--- /dev/null
+++ 
b/flink-table/flink-table-common/src/test/java/org/apache/flink/table/factories/TestCatalogFactory.java
@@ -0,0 +1,364 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.factories;
+
+import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.configuration.ConfigOptions;
+import org.apache.flink.table.catalog.Catalog;
+import org.apache.flink.table.catalog.CatalogBaseTable;
+import org.apache.flink.table.catalog.CatalogDatabase;
+import org.apache.flink.table.catalog.CatalogFunction;
+import org.apache.flink.table.catalog.CatalogPartition;
+import org.apache.flink.table.catalog.CatalogPartitionSpec;
+import org.apache.flink.table.catalog.CommonCatalogOptions;
+import org.apache.flink.table.catalog.ObjectPath;
+import org.apache.flink.table.catalog.exceptions.CatalogException;
+import org.apache.flink.table.catalog.exceptions.DatabaseAlreadyExistException;
+import org.apache.flink.table.catalog.exceptions.DatabaseNotEmptyException;
+import org.apache.flink.table.catalog.exceptions.DatabaseNotExistException;
+import org.apache.flink.table.catalog.exceptions.FunctionAlreadyExistException;
+import org.apache.flink.table.catalog.exceptions.FunctionNotExistException;
+import 
org.apache.flink.table.catalog.exceptions.PartitionAlreadyExistsException;
+import org.apache.flink.table.catalog.exceptions.PartitionNotExistException;
+import org.apache.flink.table.catalog.exceptions.PartitionSpecInvalidException;
+import org.apache.flink.table.catalog.exceptions.TableAlreadyExistException;
+import org.apache.flink.table.catalog.exceptions.TableNotExistException;
+import org.apache.flink.table.catalog.exceptions.TableNotPartitionedException;
+import org.apache.flink.table.catalog.exceptions.TablePartitionedException;
+import org.apache.flink.table.catalog.stats.CatalogColumnStatistics;
+import org.apache.flink.table.catalog.stats.CatalogTableStatistics;
+import org.apache.flink.table.expressions.Expression;
+
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+/** Test catalog factory for catalog discovery tests. */
+public class TestCatalogFactory implements CatalogFactory {
+
+    public static final String IDENTIFIER = "test-catalog";
+
+    public static final ConfigOption<String> DEFAULT_DATABASE =
+            ConfigOptions.key(CommonCatalogOptions.DEFAULT_DATABASE_KEY)
+                    .stringType()
+                    .noDefaultValue();
+
+    @Override
+    public String factoryIdentifier() {
+        return IDENTIFIER;
+    }
+
+    @Override
+    public Set<ConfigOption<?>> requiredOptions() {
+        return Collections.emptySet();
+    }
+
+    @Override
+    public Set<ConfigOption<?>> optionalOptions() {
+        final Set<ConfigOption<?>> options = new HashSet<>();
+        options.add(DEFAULT_DATABASE);
+        return options;
+    }
+
+    @Override
+    public Catalog createCatalog(Context context) {
+        return new TestCatalog(context.getName(), context.getOptions());
+    }
+
+    /** Test catalog for discovery testing. */
+    public static class TestCatalog implements Catalog {
+        private final String name;
+        private final Map<String, String> options;
+
+        public TestCatalog(String name, Map<String, String> options) {
+            this.name = name;
+            this.options = options;
+        }
+
+        public String getName() {
+            return name;
+        }
+
+        public Map<String, String> getOptions() {
+            return options;
+        }
+
+        @Override
+        public void open() throws CatalogException {}
+
+        @Override
+        public void close() throws CatalogException {}
+
+        @Override
+        public String getDefaultDatabase() throws CatalogException {
+            throw new UnsupportedOperationException();
+        }
+
+        @Override
+        public List<String> listDatabases() throws CatalogException {
+            throw new UnsupportedOperationException();
+        }
+
+        @Override
+        public CatalogDatabase getDatabase(String databaseName)
+                throws DatabaseNotExistException, CatalogException {
+            throw new UnsupportedOperationException();
+        }
+
+        @Override
+        public boolean databaseExists(String databaseName) throws 
CatalogException {
+            throw new UnsupportedOperationException();
+        }
+
+        @Override
+        public void createDatabase(String name, CatalogDatabase database, 
boolean ignoreIfExists)
+                throws DatabaseAlreadyExistException, CatalogException {
+            throw new UnsupportedOperationException();
+        }
+
+        @Override
+        public void dropDatabase(String name, boolean ignoreIfNotExists, 
boolean cascade)
+                throws DatabaseNotExistException, DatabaseNotEmptyException, 
CatalogException {
+            throw new UnsupportedOperationException();
+        }
+
+        @Override
+        public void alterDatabase(
+                String name, CatalogDatabase newDatabase, boolean 
ignoreIfNotExists)
+                throws DatabaseNotExistException, CatalogException {
+            throw new UnsupportedOperationException();
+        }
+
+        @Override
+        public List<String> listTables(String databaseName)
+                throws DatabaseNotExistException, CatalogException {
+            throw new UnsupportedOperationException();
+        }
+
+        @Override
+        public List<String> listViews(String databaseName)
+                throws DatabaseNotExistException, CatalogException {
+            throw new UnsupportedOperationException();
+        }
+
+        @Override
+        public CatalogBaseTable getTable(ObjectPath tablePath)
+                throws TableNotExistException, CatalogException {
+            throw new UnsupportedOperationException();
+        }
+
+        @Override
+        public boolean tableExists(ObjectPath tablePath) throws 
CatalogException {
+            throw new UnsupportedOperationException();
+        }
+
+        @Override
+        public void dropTable(ObjectPath tablePath, boolean ignoreIfNotExists)
+                throws TableNotExistException, CatalogException {
+            throw new UnsupportedOperationException();
+        }
+
+        @Override
+        public void renameTable(
+                ObjectPath tablePath, String newTableName, boolean 
ignoreIfNotExists)
+                throws TableNotExistException, TableAlreadyExistException, 
CatalogException {
+            throw new UnsupportedOperationException();
+        }
+
+        @Override
+        public void createTable(
+                ObjectPath tablePath, CatalogBaseTable table, boolean 
ignoreIfExists)
+                throws TableAlreadyExistException, DatabaseNotExistException, 
CatalogException {
+            throw new UnsupportedOperationException();
+        }
+
+        @Override
+        public void alterTable(
+                ObjectPath tablePath, CatalogBaseTable newTable, boolean 
ignoreIfNotExists)
+                throws TableNotExistException, CatalogException {
+            throw new UnsupportedOperationException();
+        }
+
+        @Override
+        public List<CatalogPartitionSpec> listPartitions(ObjectPath tablePath)
+                throws TableNotExistException, TableNotPartitionedException, 
CatalogException {
+            throw new UnsupportedOperationException();
+        }
+
+        @Override
+        public List<CatalogPartitionSpec> listPartitions(
+                ObjectPath tablePath, CatalogPartitionSpec partitionSpec)
+                throws TableNotExistException, TableNotPartitionedException,
+                        PartitionSpecInvalidException, CatalogException {
+            throw new UnsupportedOperationException();
+        }
+
+        @Override
+        public List<CatalogPartitionSpec> listPartitionsByFilter(
+                ObjectPath tablePath, List<Expression> filters)
+                throws TableNotExistException, TableNotPartitionedException, 
CatalogException {
+            throw new UnsupportedOperationException();
+        }
+
+        @Override
+        public CatalogPartition getPartition(
+                ObjectPath tablePath, CatalogPartitionSpec partitionSpec)
+                throws PartitionNotExistException, CatalogException {
+            throw new UnsupportedOperationException();
+        }
+
+        @Override
+        public boolean partitionExists(ObjectPath tablePath, 
CatalogPartitionSpec partitionSpec)
+                throws CatalogException {
+            throw new UnsupportedOperationException();
+        }
+
+        @Override
+        public void createPartition(
+                ObjectPath tablePath,
+                CatalogPartitionSpec partitionSpec,
+                CatalogPartition partition,
+                boolean ignoreIfExists)
+                throws TableNotExistException, TableNotPartitionedException,
+                        PartitionSpecInvalidException, 
PartitionAlreadyExistsException,
+                        CatalogException {
+            throw new UnsupportedOperationException();
+        }
+
+        @Override
+        public void dropPartition(
+                ObjectPath tablePath, CatalogPartitionSpec partitionSpec, 
boolean ignoreIfNotExists)
+                throws PartitionNotExistException, CatalogException {
+            throw new UnsupportedOperationException();
+        }
+
+        @Override
+        public void alterPartition(
+                ObjectPath tablePath,
+                CatalogPartitionSpec partitionSpec,
+                CatalogPartition newPartition,
+                boolean ignoreIfNotExists)
+                throws PartitionNotExistException, CatalogException {
+            throw new UnsupportedOperationException();
+        }
+
+        @Override
+        public List<String> listFunctions(String dbName)
+                throws DatabaseNotExistException, CatalogException {
+            throw new UnsupportedOperationException();
+        }
+
+        @Override
+        public CatalogFunction getFunction(ObjectPath functionPath)
+                throws FunctionNotExistException, CatalogException {
+            throw new UnsupportedOperationException();
+        }
+
+        @Override
+        public boolean functionExists(ObjectPath functionPath) throws 
CatalogException {
+            throw new UnsupportedOperationException();
+        }
+
+        @Override
+        public void createFunction(
+                ObjectPath functionPath, CatalogFunction function, boolean 
ignoreIfExists)
+                throws FunctionAlreadyExistException, 
DatabaseNotExistException, CatalogException {
+            throw new UnsupportedOperationException();
+        }
+
+        @Override
+        public void alterFunction(
+                ObjectPath functionPath, CatalogFunction newFunction, boolean 
ignoreIfNotExists)
+                throws FunctionNotExistException, CatalogException {
+            throw new UnsupportedOperationException();
+        }
+
+        @Override
+        public void dropFunction(ObjectPath functionPath, boolean 
ignoreIfNotExists)
+                throws FunctionNotExistException, CatalogException {
+            throw new UnsupportedOperationException();
+        }
+
+        @Override
+        public CatalogTableStatistics getTableStatistics(ObjectPath tablePath)
+                throws TableNotExistException, CatalogException {
+            throw new UnsupportedOperationException();
+        }
+
+        @Override
+        public CatalogColumnStatistics getTableColumnStatistics(ObjectPath 
tablePath)
+                throws TableNotExistException, CatalogException {
+            throw new UnsupportedOperationException();
+        }
+
+        @Override
+        public CatalogTableStatistics getPartitionStatistics(
+                ObjectPath tablePath, CatalogPartitionSpec partitionSpec)
+                throws PartitionNotExistException, CatalogException {
+            throw new UnsupportedOperationException();
+        }
+
+        @Override
+        public CatalogColumnStatistics getPartitionColumnStatistics(
+                ObjectPath tablePath, CatalogPartitionSpec partitionSpec)
+                throws PartitionNotExistException, CatalogException {
+            throw new UnsupportedOperationException();
+        }
+
+        @Override
+        public void alterTableStatistics(
+                ObjectPath tablePath,
+                CatalogTableStatistics tableStatistics,
+                boolean ignoreIfNotExists)
+                throws TableNotExistException, CatalogException {
+            throw new UnsupportedOperationException();
+        }
+
+        @Override
+        public void alterTableColumnStatistics(
+                ObjectPath tablePath,
+                CatalogColumnStatistics columnStatistics,
+                boolean ignoreIfNotExists)
+                throws TableNotExistException, CatalogException, 
TablePartitionedException {
+            throw new UnsupportedOperationException();
+        }
+
+        @Override
+        public void alterPartitionStatistics(
+                ObjectPath tablePath,
+                CatalogPartitionSpec partitionSpec,
+                CatalogTableStatistics partitionStatistics,
+                boolean ignoreIfNotExists)
+                throws PartitionNotExistException, CatalogException {
+            throw new UnsupportedOperationException();
+        }
+
+        @Override
+        public void alterPartitionColumnStatistics(
+                ObjectPath tablePath,
+                CatalogPartitionSpec partitionSpec,
+                CatalogColumnStatistics columnStatistics,
+                boolean ignoreIfNotExists)
+                throws PartitionNotExistException, CatalogException {
+            throw new UnsupportedOperationException();
+        }
+    }
+}
diff --git 
a/flink-table/flink-table-common/src/test/resources/META-INF/services/org.apache.flink.table.factories.Factory
 
b/flink-table/flink-table-common/src/test/resources/META-INF/services/org.apache.flink.table.factories.Factory
index 9c66d01..8a7f7f2 100644
--- 
a/flink-table/flink-table-common/src/test/resources/META-INF/services/org.apache.flink.table.factories.Factory
+++ 
b/flink-table/flink-table-common/src/test/resources/META-INF/services/org.apache.flink.table.factories.Factory
@@ -21,3 +21,4 @@ 
org.apache.flink.table.factories.TestDynamicTableSinkOnlyFactory
 org.apache.flink.table.factories.TestDynamicTableSourceOnlyFactory
 org.apache.flink.table.factories.TestConflictingDynamicTableFactory1
 org.apache.flink.table.factories.TestConflictingDynamicTableFactory2
+org.apache.flink.table.factories.TestCatalogFactory
diff --git 
a/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/catalog/CatalogITCase.java
 
b/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/catalog/CatalogITCase.java
index faeb218..0426985 100644
--- 
a/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/catalog/CatalogITCase.java
+++ 
b/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/catalog/CatalogITCase.java
@@ -23,6 +23,7 @@ import org.apache.flink.table.api.EnvironmentSettings;
 import org.apache.flink.table.api.TableEnvironment;
 import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
 import org.apache.flink.table.catalog.GenericInMemoryCatalog;
+import org.apache.flink.table.catalog.GenericInMemoryCatalogFactoryOptions;
 import org.apache.flink.testutils.ClassLoaderUtils;
 import org.apache.flink.util.TemporaryClassLoaderContext;
 
@@ -32,7 +33,6 @@ import org.junit.rules.TemporaryFolder;
 
 import java.net.URLClassLoader;
 
-import static 
org.apache.flink.table.descriptors.GenericInMemoryCatalogValidator.CATALOG_TYPE_VALUE_GENERIC_IN_MEMORY;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertTrue;
 
@@ -48,7 +48,7 @@ public class CatalogITCase {
         String ddl =
                 String.format(
                         "create catalog %s with('type'='%s')",
-                        name, CATALOG_TYPE_VALUE_GENERIC_IN_MEMORY);
+                        name, GenericInMemoryCatalogFactoryOptions.IDENTIFIER);
 
         tableEnv.executeSql(ddl);
 
@@ -64,7 +64,7 @@ public class CatalogITCase {
         String ddl =
                 String.format(
                         "create catalog %s with('type'='%s')",
-                        name, CATALOG_TYPE_VALUE_GENERIC_IN_MEMORY);
+                        name, GenericInMemoryCatalogFactoryOptions.IDENTIFIER);
         tableEnv.executeSql(ddl);
         assertTrue(tableEnv.getCatalog(name).isPresent());
 
diff --git 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/factories/CatalogFactoryServiceTest.java
 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/factories/CatalogFactoryServiceTest.java
index d2dd64c..98a775c 100644
--- 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/factories/CatalogFactoryServiceTest.java
+++ 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/factories/CatalogFactoryServiceTest.java
@@ -19,22 +19,24 @@
 package org.apache.flink.table.factories;
 
 import org.apache.flink.table.api.NoMatchingTableFactoryException;
-import org.apache.flink.table.factories.utils.TestCatalogFactory;
+import org.apache.flink.table.catalog.CommonCatalogOptions;
+import org.apache.flink.table.factories.utils.TestLegacyCatalogFactory;
 
 import org.junit.Test;
 
 import java.util.HashMap;
 import java.util.Map;
 
-import static 
org.apache.flink.table.descriptors.CatalogDescriptorValidator.CATALOG_PROPERTY_VERSION;
-import static 
org.apache.flink.table.descriptors.CatalogDescriptorValidator.CATALOG_TYPE;
-import static 
org.apache.flink.table.factories.utils.TestCatalogFactory.CATALOG_TYPE_TEST;
+import static 
org.apache.flink.table.factories.utils.TestLegacyCatalogFactory.CATALOG_TYPE_TEST_LEGACY;
 import static org.junit.Assert.assertEquals;
 
 /**
  * Tests for testing external catalog discovery using {@link 
TableFactoryService}. The tests assume
  * the catalog factory {@link CatalogFactory} is registered.
+ *
+ * @deprecated These tests are for the legacy factory stack
  */
+@Deprecated
 public class CatalogFactoryServiceTest {
     @Test
     public void testValidProperties() {
@@ -42,25 +44,25 @@ public class CatalogFactoryServiceTest {
 
         assertEquals(
                 TableFactoryService.find(CatalogFactory.class, 
props).getClass(),
-                TestCatalogFactory.class);
+                TestLegacyCatalogFactory.class);
     }
 
     @Test(expected = NoMatchingTableFactoryException.class)
     public void testInvalidContext() {
         Map<String, String> props = properties();
-        props.put(CATALOG_TYPE, "unknown-catalog-type");
+        props.put(CommonCatalogOptions.CATALOG_TYPE.key(), 
"unknown-catalog-type");
         TableFactoryService.find(CatalogFactory.class, props);
     }
 
     @Test
     public void testDifferentContextVersion() {
         Map<String, String> props = properties();
-        props.put(CATALOG_PROPERTY_VERSION, "2");
+        props.put(FactoryUtil.PROPERTY_VERSION.key(), "2");
 
         // the catalog should still be found
         assertEquals(
                 TableFactoryService.find(CatalogFactory.class, 
props).getClass(),
-                TestCatalogFactory.class);
+                TestLegacyCatalogFactory.class);
     }
 
     @Test(expected = NoMatchingTableFactoryException.class)
@@ -73,8 +75,8 @@ public class CatalogFactoryServiceTest {
     private Map<String, String> properties() {
         Map<String, String> properties = new HashMap<>();
 
-        properties.put(CATALOG_TYPE, CATALOG_TYPE_TEST);
-        properties.put(CATALOG_PROPERTY_VERSION, "1");
+        properties.put(CommonCatalogOptions.CATALOG_TYPE.key(), 
CATALOG_TYPE_TEST_LEGACY);
+        properties.put(FactoryUtil.PROPERTY_VERSION.key(), "1");
         return properties;
     }
 }
diff --git 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/factories/utils/TestCatalogFactory.java
 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/factories/utils/TestLegacyCatalogFactory.java
similarity index 77%
rename from 
flink-table/flink-table-planner/src/test/java/org/apache/flink/table/factories/utils/TestCatalogFactory.java
rename to 
flink-table/flink-table-planner/src/test/java/org/apache/flink/table/factories/utils/TestLegacyCatalogFactory.java
index fec1de3..786c6ab 100644
--- 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/factories/utils/TestCatalogFactory.java
+++ 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/factories/utils/TestLegacyCatalogFactory.java
@@ -19,33 +19,26 @@
 package org.apache.flink.table.factories.utils;
 
 import org.apache.flink.table.catalog.Catalog;
+import org.apache.flink.table.catalog.CommonCatalogOptions;
 import org.apache.flink.table.catalog.GenericInMemoryCatalog;
 import org.apache.flink.table.factories.CatalogFactory;
+import org.apache.flink.table.factories.FactoryUtil;
 
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 
-import static 
org.apache.flink.table.descriptors.CatalogDescriptorValidator.CATALOG_PROPERTY_VERSION;
-import static 
org.apache.flink.table.descriptors.CatalogDescriptorValidator.CATALOG_TYPE;
+/** Legacy catalog factory for testing. */
+public class TestLegacyCatalogFactory implements CatalogFactory {
 
-/** Catalog factory for testing. */
-public class TestCatalogFactory implements CatalogFactory {
-
-    public static final String CATALOG_TYPE_TEST = "test";
-
-    @Override
-    public Catalog createCatalog(String name, Map<String, String> properties) {
-        return new GenericInMemoryCatalog(name);
-    }
+    public static final String CATALOG_TYPE_TEST_LEGACY = "test-legacy";
 
     @Override
     public Map<String, String> requiredContext() {
         Map<String, String> context = new HashMap<>();
-        context.put(CATALOG_TYPE, CATALOG_TYPE_TEST);
-        context.put(CATALOG_PROPERTY_VERSION, "1");
-
+        context.put(CommonCatalogOptions.CATALOG_TYPE.key(), 
CATALOG_TYPE_TEST_LEGACY);
+        context.put(FactoryUtil.PROPERTY_VERSION.key(), "1");
         return context;
     }
 
@@ -53,4 +46,9 @@ public class TestCatalogFactory implements CatalogFactory {
     public List<String> supportedProperties() {
         return Collections.emptyList();
     }
+
+    @Override
+    public Catalog createCatalog(String name, Map<String, String> properties) {
+        return new GenericInMemoryCatalog(name);
+    }
 }
diff --git 
a/flink-table/flink-table-planner/src/test/resources/META-INF/services/org.apache.flink.table.factories.TableFactory
 
b/flink-table/flink-table-planner/src/test/resources/META-INF/services/org.apache.flink.table.factories.TableFactory
index d9aab28..17edf07 100644
--- 
a/flink-table/flink-table-planner/src/test/resources/META-INF/services/org.apache.flink.table.factories.TableFactory
+++ 
b/flink-table/flink-table-planner/src/test/resources/META-INF/services/org.apache.flink.table.factories.TableFactory
@@ -19,5 +19,5 @@ org.apache.flink.table.factories.utils.TestTableSourceFactory
 org.apache.flink.table.factories.utils.TestTableFormatFactory
 org.apache.flink.table.factories.utils.TestAmbiguousTableFormatFactory
 org.apache.flink.table.catalog.TestExternalTableSourceFactory
-org.apache.flink.table.factories.utils.TestCatalogFactory
 org.apache.flink.table.factories.utils.TestCollectionTableFactory
+org.apache.flink.table.factories.utils.TestLegacyCatalogFactory

Reply via email to