ruanhang1993 commented on code in PR #22937:
URL: https://github.com/apache/flink/pull/22937#discussion_r1252649833


##########
flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/CatalogDescriptor.java:
##########
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.catalog;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.configuration.Configuration;
+
+/** Describes {@link Catalog} with catalogName and configuration. */

Review Comment:
   ```java
   /**
    * Describes a {@link Catalog} with the catalog name and configuration.
    *
    * <p>A {@link CatalogDescriptor} is a template for creating a {@link 
Catalog} instance. It
    * closely resembles the "CREATE CATALOG" SQL DDL statement, containing 
catalog name and catalog configuration. 
    * A {@link CatalogDescriptor} could be stored to {@link CatalogStore}. 
    * 
    * <p>This can be used to register a catalog in the Table API, see {@link
    * TableEnvironment#createCatalog(String, CatalogDescriptor)}.
    */
   ```



##########
flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/CatalogDescriptor.java:
##########
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.catalog;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.configuration.Configuration;
+
+/** Describes {@link Catalog} with catalogName and configuration. */
+@PublicEvolving
+public class CatalogDescriptor {
+
+    /* Catalog name */
+    private final String catalogName;
+
+    /* The configuration used to discover and construct the catalog. */
+    private final Configuration configuration;
+
+    public String getCatalogName() {
+        return catalogName;
+    }
+
+    public Configuration getConfiguration() {
+        return configuration;
+    }
+
+    private CatalogDescriptor(String catalogName, Configuration configuration) 
{
+        this.catalogName = catalogName;
+        this.configuration = configuration;
+    }
+
+    /**
+     * @param catalogName CatalogName of the register catalog.
+     * @param configuration Catalog configuration to the catalog instance.
+     * @return
+     */

Review Comment:
   ```java
   /**
        * Creates an instance of this interface.
        *
        * @param catalogName the name of the catalog
        * @param configuration the configuration of the catalog
        */
   ```



##########
flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/CatalogStore.java:
##########
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.catalog;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.table.catalog.exceptions.CatalogException;
+
+import java.util.Optional;
+import java.util.Set;
+
+/**
+ * This interface describes the behavior of retrieving, saving, or deleting 
Catalog configurations
+ * from an external storage system.
+ */
+@PublicEvolving
+public interface CatalogStore {
+
+    /**
+     * Store a catalog under the give name. The catalog name must be unique.
+     *
+     * @param catalogName name under which to register the given catalog
+     * @param catalog catalog instance to store
+     * @throws CatalogException if the registration of the catalog under the 
given name failed
+     */
+    void storeCatalog(String catalogName, CatalogDescriptor catalog) throws 
CatalogException;
+
+    /**
+     * Unregisters a catalog under the given name. The catalog name must be 
existed.
+     *
+     * @param catalogName name under which to unregister the given catalog.

Review Comment:
   the given catalog name



##########
flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/CatalogStore.java:
##########
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.catalog;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.table.catalog.exceptions.CatalogException;
+
+import java.util.Optional;
+import java.util.Set;
+
+/**
+ * This interface describes the behavior of retrieving, saving, or deleting 
Catalog configurations
+ * from an external storage system.
+ */
+@PublicEvolving
+public interface CatalogStore {
+
+    /**
+     * Store a catalog under the give name. The catalog name must be unique.
+     *
+     * @param catalogName name under which to register the given catalog
+     * @param catalog catalog instance to store
+     * @throws CatalogException if the registration of the catalog under the 
given name failed
+     */
+    void storeCatalog(String catalogName, CatalogDescriptor catalog) throws 
CatalogException;
+
+    /**
+     * Unregisters a catalog under the given name. The catalog name must be 
existed.
+     *
+     * @param catalogName name under which to unregister the given catalog.
+     * @param ignoreIfNotExists If false exception will be thrown if the table 
or database or
+     *     catalog to be altered does not exist.
+     * @throws CatalogException if the unregistration of the catalog under the 
given name failed
+     */
+    void removeCatalog(String catalogName, boolean ignoreIfNotExists) throws 
CatalogException;
+
+    /**
+     * Gets a catalog by name.
+     *
+     * @param catalogName name of the catalog to retrieve
+     * @return the requested catalog or empty if it does not exist
+     */
+    Optional<CatalogDescriptor> getCatalog(String catalogName);
+
+    /**
+     * Retrieves names of all registered catalogs.
+     *
+     * @return a set of names of registered catalogs
+     */
+    Set<String> listCatalogs();
+
+    /**
+     * Check if there is a corresponding catalog with the given name in 
CatalogStore.
+     *
+     * @return whether there is a corresponding Catalog with the given name
+     */
+    boolean contains(String catalogName);
+
+    /** Initialization method for the CatalogStore. */
+    void open();
+
+    /** Tear-down method for the CatalogStore. */

Review Comment:
   Close the catalog store.



##########
flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/CatalogStore.java:
##########
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.catalog;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.table.catalog.exceptions.CatalogException;
+
+import java.util.Optional;
+import java.util.Set;
+
+/**
+ * This interface describes the behavior of retrieving, saving, or deleting 
Catalog configurations
+ * from an external storage system.
+ */
+@PublicEvolving
+public interface CatalogStore {
+
+    /**
+     * Store a catalog under the give name. The catalog name must be unique.
+     *
+     * @param catalogName name under which to register the given catalog
+     * @param catalog catalog instance to store
+     * @throws CatalogException if the registration of the catalog under the 
given name failed
+     */
+    void storeCatalog(String catalogName, CatalogDescriptor catalog) throws 
CatalogException;
+
+    /**
+     * Unregisters a catalog under the given name. The catalog name must be 
existed.
+     *
+     * @param catalogName name under which to unregister the given catalog.
+     * @param ignoreIfNotExists If false exception will be thrown if the table 
or database or
+     *     catalog to be altered does not exist.
+     * @throws CatalogException if the unregistration of the catalog under the 
given name failed
+     */
+    void removeCatalog(String catalogName, boolean ignoreIfNotExists) throws 
CatalogException;
+
+    /**
+     * Gets a catalog by name.
+     *
+     * @param catalogName name of the catalog to retrieve
+     * @return the requested catalog or empty if it does not exist
+     */
+    Optional<CatalogDescriptor> getCatalog(String catalogName);
+
+    /**
+     * Retrieves names of all registered catalogs.

Review Comment:
   the names of



##########
flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/CatalogStore.java:
##########
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.catalog;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.table.catalog.exceptions.CatalogException;
+
+import java.util.Optional;
+import java.util.Set;
+
+/**
+ * This interface describes the behavior of retrieving, saving, or deleting 
Catalog configurations
+ * from an external storage system.
+ */
+@PublicEvolving
+public interface CatalogStore {
+
+    /**
+     * Store a catalog under the give name. The catalog name must be unique.
+     *
+     * @param catalogName name under which to register the given catalog
+     * @param catalog catalog instance to store
+     * @throws CatalogException if the registration of the catalog under the 
given name failed

Review Comment:
   throw when the registration failed



##########
flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/CatalogStore.java:
##########
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.catalog;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.table.catalog.exceptions.CatalogException;
+
+import java.util.Optional;
+import java.util.Set;
+
+/**
+ * This interface describes the behavior of retrieving, saving, or deleting 
Catalog configurations
+ * from an external storage system.
+ */
+@PublicEvolving
+public interface CatalogStore {
+
+    /**
+     * Store a catalog under the give name. The catalog name must be unique.
+     *
+     * @param catalogName name under which to register the given catalog
+     * @param catalog catalog instance to store

Review Comment:
   catalog descriptor to store



##########
flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/CatalogStore.java:
##########
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.catalog;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.table.catalog.exceptions.CatalogException;
+
+import java.util.Optional;
+import java.util.Set;
+
+/**
+ * This interface describes the behavior of retrieving, saving, or deleting 
Catalog configurations
+ * from an external storage system.
+ */
+@PublicEvolving
+public interface CatalogStore {
+
+    /**
+     * Store a catalog under the give name. The catalog name must be unique.
+     *
+     * @param catalogName name under which to register the given catalog
+     * @param catalog catalog instance to store
+     * @throws CatalogException if the registration of the catalog under the 
given name failed
+     */
+    void storeCatalog(String catalogName, CatalogDescriptor catalog) throws 
CatalogException;
+
+    /**
+     * Unregisters a catalog under the given name. The catalog name must be 
existed.

Review Comment:
   Remove a catalog with the given catalog name.



##########
flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/CatalogStore.java:
##########
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.catalog;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.table.catalog.exceptions.CatalogException;
+
+import java.util.Optional;
+import java.util.Set;
+
+/**
+ * This interface describes the behavior of retrieving, saving, or deleting 
Catalog configurations
+ * from an external storage system.
+ */
+@PublicEvolving
+public interface CatalogStore {
+
+    /**
+     * Store a catalog under the give name. The catalog name must be unique.
+     *
+     * @param catalogName name under which to register the given catalog
+     * @param catalog catalog instance to store
+     * @throws CatalogException if the registration of the catalog under the 
given name failed
+     */
+    void storeCatalog(String catalogName, CatalogDescriptor catalog) throws 
CatalogException;
+
+    /**
+     * Unregisters a catalog under the given name. The catalog name must be 
existed.
+     *
+     * @param catalogName name under which to unregister the given catalog.
+     * @param ignoreIfNotExists If false exception will be thrown if the table 
or database or
+     *     catalog to be altered does not exist.
+     * @throws CatalogException if the unregistration of the catalog under the 
given name failed

Review Comment:
   throw when the removal operation failed



##########
flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/CatalogStore.java:
##########
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.catalog;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.table.catalog.exceptions.CatalogException;
+
+import java.util.Optional;
+import java.util.Set;
+
+/**
+ * This interface describes the behavior of retrieving, saving, or deleting 
Catalog configurations
+ * from an external storage system.
+ */

Review Comment:
   ```java
   /**
    * The {@link CatalogStore} is used in {@link CatalogManager} to retrieve, 
save and remove {@link CatalogDescriptor} at the
    * external storage system. 
   */
   ```



##########
flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/CatalogStore.java:
##########
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.catalog;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.table.catalog.exceptions.CatalogException;
+
+import java.util.Optional;
+import java.util.Set;
+
+/**
+ * This interface describes the behavior of retrieving, saving, or deleting 
Catalog configurations
+ * from an external storage system.
+ */
+@PublicEvolving
+public interface CatalogStore {
+
+    /**
+     * Store a catalog under the give name. The catalog name must be unique.
+     *
+     * @param catalogName name under which to register the given catalog
+     * @param catalog catalog instance to store
+     * @throws CatalogException if the registration of the catalog under the 
given name failed
+     */
+    void storeCatalog(String catalogName, CatalogDescriptor catalog) throws 
CatalogException;
+
+    /**
+     * Unregisters a catalog under the given name. The catalog name must be 
existed.
+     *
+     * @param catalogName name under which to unregister the given catalog.
+     * @param ignoreIfNotExists If false exception will be thrown if the table 
or database or
+     *     catalog to be altered does not exist.

Review Comment:
   whether throw an exception when the catalog does not exist.



##########
flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/CatalogStore.java:
##########
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.catalog;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.table.catalog.exceptions.CatalogException;
+
+import java.util.Optional;
+import java.util.Set;
+
+/**
+ * This interface describes the behavior of retrieving, saving, or deleting 
Catalog configurations
+ * from an external storage system.
+ */
+@PublicEvolving
+public interface CatalogStore {
+
+    /**
+     * Store a catalog under the give name. The catalog name must be unique.
+     *
+     * @param catalogName name under which to register the given catalog

Review Comment:
   the given catalog name



##########
flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/CatalogStore.java:
##########
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.catalog;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.table.catalog.exceptions.CatalogException;
+
+import java.util.Optional;
+import java.util.Set;
+
+/**
+ * This interface describes the behavior of retrieving, saving, or deleting 
Catalog configurations
+ * from an external storage system.
+ */
+@PublicEvolving
+public interface CatalogStore {
+
+    /**
+     * Store a catalog under the give name. The catalog name must be unique.
+     *
+     * @param catalogName name under which to register the given catalog
+     * @param catalog catalog instance to store
+     * @throws CatalogException if the registration of the catalog under the 
given name failed
+     */
+    void storeCatalog(String catalogName, CatalogDescriptor catalog) throws 
CatalogException;
+
+    /**
+     * Unregisters a catalog under the given name. The catalog name must be 
existed.
+     *
+     * @param catalogName name under which to unregister the given catalog.
+     * @param ignoreIfNotExists If false exception will be thrown if the table 
or database or
+     *     catalog to be altered does not exist.
+     * @throws CatalogException if the unregistration of the catalog under the 
given name failed
+     */
+    void removeCatalog(String catalogName, boolean ignoreIfNotExists) throws 
CatalogException;
+
+    /**
+     * Gets a catalog by name.
+     *
+     * @param catalogName name of the catalog to retrieve
+     * @return the requested catalog or empty if it does not exist

Review Comment:
   if the catalog does not exist



##########
flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/CatalogStore.java:
##########
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.catalog;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.table.catalog.exceptions.CatalogException;
+
+import java.util.Optional;
+import java.util.Set;
+
+/**
+ * This interface describes the behavior of retrieving, saving, or deleting 
Catalog configurations
+ * from an external storage system.
+ */
+@PublicEvolving
+public interface CatalogStore {
+
+    /**
+     * Store a catalog under the give name. The catalog name must be unique.
+     *
+     * @param catalogName name under which to register the given catalog
+     * @param catalog catalog instance to store
+     * @throws CatalogException if the registration of the catalog under the 
given name failed
+     */
+    void storeCatalog(String catalogName, CatalogDescriptor catalog) throws 
CatalogException;
+
+    /**
+     * Unregisters a catalog under the given name. The catalog name must be 
existed.
+     *
+     * @param catalogName name under which to unregister the given catalog.
+     * @param ignoreIfNotExists If false exception will be thrown if the table 
or database or
+     *     catalog to be altered does not exist.
+     * @throws CatalogException if the unregistration of the catalog under the 
given name failed
+     */
+    void removeCatalog(String catalogName, boolean ignoreIfNotExists) throws 
CatalogException;
+
+    /**
+     * Gets a catalog by name.

Review Comment:
   Get



##########
flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/CatalogStore.java:
##########
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.catalog;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.table.catalog.exceptions.CatalogException;
+
+import java.util.Optional;
+import java.util.Set;
+
+/**
+ * This interface describes the behavior of retrieving, saving, or deleting 
Catalog configurations
+ * from an external storage system.
+ */
+@PublicEvolving
+public interface CatalogStore {
+
+    /**
+     * Store a catalog under the give name. The catalog name must be unique.
+     *
+     * @param catalogName name under which to register the given catalog
+     * @param catalog catalog instance to store
+     * @throws CatalogException if the registration of the catalog under the 
given name failed
+     */
+    void storeCatalog(String catalogName, CatalogDescriptor catalog) throws 
CatalogException;
+
+    /**
+     * Unregisters a catalog under the given name. The catalog name must be 
existed.
+     *
+     * @param catalogName name under which to unregister the given catalog.
+     * @param ignoreIfNotExists If false exception will be thrown if the table 
or database or
+     *     catalog to be altered does not exist.
+     * @throws CatalogException if the unregistration of the catalog under the 
given name failed
+     */
+    void removeCatalog(String catalogName, boolean ignoreIfNotExists) throws 
CatalogException;
+
+    /**
+     * Gets a catalog by name.
+     *
+     * @param catalogName name of the catalog to retrieve
+     * @return the requested catalog or empty if it does not exist
+     */
+    Optional<CatalogDescriptor> getCatalog(String catalogName);
+
+    /**
+     * Retrieves names of all registered catalogs.
+     *
+     * @return a set of names of registered catalogs
+     */
+    Set<String> listCatalogs();
+
+    /**
+     * Check if there is a corresponding catalog with the given name in 
CatalogStore.

Review Comment:
   Return whether the catalog exists in the catalog store.



##########
flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/GenericInMemoryCatalogStoreFactoryOptions.java:
##########
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.catalog;
+
+import org.apache.flink.configuration.ConfigOption;
+
+/** {@link ConfigOption}s for {@link GenericInMemoryCatalogStore}. */
+public class GenericInMemoryCatalogStoreFactoryOptions {
+    public static final String IDENTIFIER = "generic_in_memory";

Review Comment:
   private GenericInMemoryCatalogStoreFactoryOptions() {}



##########
flink-table/flink-table-common/src/main/java/org/apache/flink/table/factories/CatalogStoreFactory.java:
##########
@@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.factories;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.configuration.ReadableConfig;
+import org.apache.flink.table.catalog.CatalogStore;
+
+import java.util.Map;
+
+/**
+ * A factory to create configured catalog store instances based on 
string-based properties. See also
+ * {@link Factory} for more information.
+ */
+@PublicEvolving
+public interface CatalogStoreFactory extends Factory {
+
+    /** Creates a {@link CatalogStore} instance from context information. */
+    CatalogStore createCatalogStore(Context context);
+
+    /** Initialization method for the CatalogStoreFactory. */
+    void open(Context context);
+
+    /** Tear-down method for the CatalogStoreFactory. */
+    void close();
+
+    /** Context provided when a catalog store is created. */
+    @PublicEvolving
+    interface Context {
+
+        /**
+         * Returns the options with which the catalog store is created.
+         *
+         * <p>An implementation should perform validation of these options.
+         */
+        Map<String, String> getOptions();
+
+        /** Gives read-only access to the configuration of the current 
session. */
+        ReadableConfig getConfiguration();
+
+        /**
+         * Returns the class loader of the current session.
+         *
+         * <p>The class loader is in particular useful for discovering further 
(nested) factories.
+         */
+        ClassLoader getClassLoader();
+    }
+
+    /** Default implementation of {@link CatalogStoreFactory.Context}. */
+    @Internal
+    class DefaultCatalogStoreContext implements CatalogStoreFactory.Context {

Review Comment:
   Maybe this default implementation should be put at `FactoryUtil`?



##########
flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/CatalogStore.java:
##########
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.catalog;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.table.catalog.exceptions.CatalogException;
+
+import java.util.Optional;
+import java.util.Set;
+
+/**
+ * This interface describes the behavior of retrieving, saving, or deleting 
Catalog configurations
+ * from an external storage system.
+ */
+@PublicEvolving
+public interface CatalogStore {
+
+    /**
+     * Store a catalog under the give name. The catalog name must be unique.
+     *
+     * @param catalogName name under which to register the given catalog
+     * @param catalog catalog instance to store
+     * @throws CatalogException if the registration of the catalog under the 
given name failed
+     */
+    void storeCatalog(String catalogName, CatalogDescriptor catalog) throws 
CatalogException;
+
+    /**
+     * Unregisters a catalog under the given name. The catalog name must be 
existed.
+     *
+     * @param catalogName name under which to unregister the given catalog.
+     * @param ignoreIfNotExists If false exception will be thrown if the table 
or database or
+     *     catalog to be altered does not exist.
+     * @throws CatalogException if the unregistration of the catalog under the 
given name failed
+     */
+    void removeCatalog(String catalogName, boolean ignoreIfNotExists) throws 
CatalogException;
+
+    /**
+     * Gets a catalog by name.
+     *
+     * @param catalogName name of the catalog to retrieve
+     * @return the requested catalog or empty if it does not exist
+     */
+    Optional<CatalogDescriptor> getCatalog(String catalogName);
+
+    /**
+     * Retrieves names of all registered catalogs.
+     *
+     * @return a set of names of registered catalogs

Review Comment:
   the names of registered catalogs



##########
flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/CatalogStore.java:
##########
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.catalog;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.table.catalog.exceptions.CatalogException;
+
+import java.util.Optional;
+import java.util.Set;
+
+/**
+ * This interface describes the behavior of retrieving, saving, or deleting 
Catalog configurations
+ * from an external storage system.
+ */
+@PublicEvolving
+public interface CatalogStore {
+
+    /**
+     * Store a catalog under the give name. The catalog name must be unique.
+     *
+     * @param catalogName name under which to register the given catalog
+     * @param catalog catalog instance to store
+     * @throws CatalogException if the registration of the catalog under the 
given name failed
+     */
+    void storeCatalog(String catalogName, CatalogDescriptor catalog) throws 
CatalogException;
+
+    /**
+     * Unregisters a catalog under the given name. The catalog name must be 
existed.
+     *
+     * @param catalogName name under which to unregister the given catalog.
+     * @param ignoreIfNotExists If false exception will be thrown if the table 
or database or
+     *     catalog to be altered does not exist.
+     * @throws CatalogException if the unregistration of the catalog under the 
given name failed
+     */
+    void removeCatalog(String catalogName, boolean ignoreIfNotExists) throws 
CatalogException;
+
+    /**
+     * Gets a catalog by name.
+     *
+     * @param catalogName name of the catalog to retrieve
+     * @return the requested catalog or empty if it does not exist
+     */
+    Optional<CatalogDescriptor> getCatalog(String catalogName);
+
+    /**
+     * Retrieves names of all registered catalogs.
+     *
+     * @return a set of names of registered catalogs
+     */
+    Set<String> listCatalogs();
+
+    /**
+     * Check if there is a corresponding catalog with the given name in 
CatalogStore.
+     *
+     * @return whether there is a corresponding Catalog with the given name
+     */
+    boolean contains(String catalogName);
+
+    /** Initialization method for the CatalogStore. */

Review Comment:
   Initialize the catalog store.



##########
flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/CatalogStore.java:
##########
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.catalog;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.table.catalog.exceptions.CatalogException;
+
+import java.util.Optional;
+import java.util.Set;
+
+/**
+ * This interface describes the behavior of retrieving, saving, or deleting 
Catalog configurations
+ * from an external storage system.
+ */
+@PublicEvolving
+public interface CatalogStore {
+
+    /**
+     * Store a catalog under the give name. The catalog name must be unique.
+     *
+     * @param catalogName name under which to register the given catalog
+     * @param catalog catalog instance to store
+     * @throws CatalogException if the registration of the catalog under the 
given name failed
+     */
+    void storeCatalog(String catalogName, CatalogDescriptor catalog) throws 
CatalogException;
+
+    /**
+     * Unregisters a catalog under the given name. The catalog name must be 
existed.
+     *
+     * @param catalogName name under which to unregister the given catalog.
+     * @param ignoreIfNotExists If false exception will be thrown if the table 
or database or
+     *     catalog to be altered does not exist.
+     * @throws CatalogException if the unregistration of the catalog under the 
given name failed
+     */
+    void removeCatalog(String catalogName, boolean ignoreIfNotExists) throws 
CatalogException;
+
+    /**
+     * Gets a catalog by name.
+     *
+     * @param catalogName name of the catalog to retrieve
+     * @return the requested catalog or empty if it does not exist
+     */
+    Optional<CatalogDescriptor> getCatalog(String catalogName);
+
+    /**
+     * Retrieves names of all registered catalogs.
+     *
+     * @return a set of names of registered catalogs
+     */
+    Set<String> listCatalogs();
+
+    /**
+     * Check if there is a corresponding catalog with the given name in 
CatalogStore.
+     *
+     * @return whether there is a corresponding Catalog with the given name

Review Comment:
   Remove this `@reutrn`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to