ruanhang1993 commented on code in PR #22937: URL: https://github.com/apache/flink/pull/22937#discussion_r1252649833
########## flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/CatalogDescriptor.java: ########## @@ -0,0 +1,55 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.catalog; + +import org.apache.flink.annotation.PublicEvolving; +import org.apache.flink.configuration.Configuration; + +/** Describes {@link Catalog} with catalogName and configuration. */ Review Comment: ```java /** * Describes a {@link Catalog} with the catalog name and configuration. * * <p>A {@link CatalogDescriptor} is a template for creating a {@link Catalog} instance. It * closely resembles the "CREATE CATALOG" SQL DDL statement, containing catalog name and catalog configuration. * A {@link CatalogDescriptor} could be stored to {@link CatalogStore}. * * <p>This can be used to register a catalog in the Table API, see {@link * TableEnvironment#createCatalog(String, CatalogDescriptor)}. */ ``` ########## flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/CatalogDescriptor.java: ########## @@ -0,0 +1,55 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.catalog; + +import org.apache.flink.annotation.PublicEvolving; +import org.apache.flink.configuration.Configuration; + +/** Describes {@link Catalog} with catalogName and configuration. */ +@PublicEvolving +public class CatalogDescriptor { + + /* Catalog name */ + private final String catalogName; + + /* The configuration used to discover and construct the catalog. */ + private final Configuration configuration; + + public String getCatalogName() { + return catalogName; + } + + public Configuration getConfiguration() { + return configuration; + } + + private CatalogDescriptor(String catalogName, Configuration configuration) { + this.catalogName = catalogName; + this.configuration = configuration; + } + + /** + * @param catalogName CatalogName of the register catalog. + * @param configuration Catalog configuration to the catalog instance. + * @return + */ Review Comment: ```java /** * Creates an instance of this interface. * * @param catalogName the name of the catalog * @param configuration the configuration of the catalog */ ``` ########## flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/CatalogStore.java: ########## @@ -0,0 +1,80 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.catalog; + +import org.apache.flink.annotation.PublicEvolving; +import org.apache.flink.table.catalog.exceptions.CatalogException; + +import java.util.Optional; +import java.util.Set; + +/** + * This interface describes the behavior of retrieving, saving, or deleting Catalog configurations + * from an external storage system. + */ +@PublicEvolving +public interface CatalogStore { + + /** + * Store a catalog under the give name. The catalog name must be unique. + * + * @param catalogName name under which to register the given catalog + * @param catalog catalog instance to store + * @throws CatalogException if the registration of the catalog under the given name failed + */ + void storeCatalog(String catalogName, CatalogDescriptor catalog) throws CatalogException; + + /** + * Unregisters a catalog under the given name. The catalog name must be existed. + * + * @param catalogName name under which to unregister the given catalog. Review Comment: the given catalog name ########## flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/CatalogStore.java: ########## @@ -0,0 +1,80 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.catalog; + +import org.apache.flink.annotation.PublicEvolving; +import org.apache.flink.table.catalog.exceptions.CatalogException; + +import java.util.Optional; +import java.util.Set; + +/** + * This interface describes the behavior of retrieving, saving, or deleting Catalog configurations + * from an external storage system. + */ +@PublicEvolving +public interface CatalogStore { + + /** + * Store a catalog under the give name. The catalog name must be unique. + * + * @param catalogName name under which to register the given catalog + * @param catalog catalog instance to store + * @throws CatalogException if the registration of the catalog under the given name failed + */ + void storeCatalog(String catalogName, CatalogDescriptor catalog) throws CatalogException; + + /** + * Unregisters a catalog under the given name. The catalog name must be existed. + * + * @param catalogName name under which to unregister the given catalog. + * @param ignoreIfNotExists If false exception will be thrown if the table or database or + * catalog to be altered does not exist. + * @throws CatalogException if the unregistration of the catalog under the given name failed + */ + void removeCatalog(String catalogName, boolean ignoreIfNotExists) throws CatalogException; + + /** + * Gets a catalog by name. + * + * @param catalogName name of the catalog to retrieve + * @return the requested catalog or empty if it does not exist + */ + Optional<CatalogDescriptor> getCatalog(String catalogName); + + /** + * Retrieves names of all registered catalogs. + * + * @return a set of names of registered catalogs + */ + Set<String> listCatalogs(); + + /** + * Check if there is a corresponding catalog with the given name in CatalogStore. + * + * @return whether there is a corresponding Catalog with the given name + */ + boolean contains(String catalogName); + + /** Initialization method for the CatalogStore. */ + void open(); + + /** Tear-down method for the CatalogStore. */ Review Comment: Close the catalog store. ########## flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/CatalogStore.java: ########## @@ -0,0 +1,80 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.catalog; + +import org.apache.flink.annotation.PublicEvolving; +import org.apache.flink.table.catalog.exceptions.CatalogException; + +import java.util.Optional; +import java.util.Set; + +/** + * This interface describes the behavior of retrieving, saving, or deleting Catalog configurations + * from an external storage system. + */ +@PublicEvolving +public interface CatalogStore { + + /** + * Store a catalog under the give name. The catalog name must be unique. + * + * @param catalogName name under which to register the given catalog + * @param catalog catalog instance to store + * @throws CatalogException if the registration of the catalog under the given name failed + */ + void storeCatalog(String catalogName, CatalogDescriptor catalog) throws CatalogException; + + /** + * Unregisters a catalog under the given name. The catalog name must be existed. + * + * @param catalogName name under which to unregister the given catalog. + * @param ignoreIfNotExists If false exception will be thrown if the table or database or + * catalog to be altered does not exist. + * @throws CatalogException if the unregistration of the catalog under the given name failed + */ + void removeCatalog(String catalogName, boolean ignoreIfNotExists) throws CatalogException; + + /** + * Gets a catalog by name. + * + * @param catalogName name of the catalog to retrieve + * @return the requested catalog or empty if it does not exist + */ + Optional<CatalogDescriptor> getCatalog(String catalogName); + + /** + * Retrieves names of all registered catalogs. Review Comment: the names of ########## flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/CatalogStore.java: ########## @@ -0,0 +1,80 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.catalog; + +import org.apache.flink.annotation.PublicEvolving; +import org.apache.flink.table.catalog.exceptions.CatalogException; + +import java.util.Optional; +import java.util.Set; + +/** + * This interface describes the behavior of retrieving, saving, or deleting Catalog configurations + * from an external storage system. + */ +@PublicEvolving +public interface CatalogStore { + + /** + * Store a catalog under the give name. The catalog name must be unique. + * + * @param catalogName name under which to register the given catalog + * @param catalog catalog instance to store + * @throws CatalogException if the registration of the catalog under the given name failed Review Comment: throw when the registration failed ########## flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/CatalogStore.java: ########## @@ -0,0 +1,80 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.catalog; + +import org.apache.flink.annotation.PublicEvolving; +import org.apache.flink.table.catalog.exceptions.CatalogException; + +import java.util.Optional; +import java.util.Set; + +/** + * This interface describes the behavior of retrieving, saving, or deleting Catalog configurations + * from an external storage system. + */ +@PublicEvolving +public interface CatalogStore { + + /** + * Store a catalog under the give name. The catalog name must be unique. + * + * @param catalogName name under which to register the given catalog + * @param catalog catalog instance to store Review Comment: catalog descriptor to store ########## flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/CatalogStore.java: ########## @@ -0,0 +1,80 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.catalog; + +import org.apache.flink.annotation.PublicEvolving; +import org.apache.flink.table.catalog.exceptions.CatalogException; + +import java.util.Optional; +import java.util.Set; + +/** + * This interface describes the behavior of retrieving, saving, or deleting Catalog configurations + * from an external storage system. + */ +@PublicEvolving +public interface CatalogStore { + + /** + * Store a catalog under the give name. The catalog name must be unique. + * + * @param catalogName name under which to register the given catalog + * @param catalog catalog instance to store + * @throws CatalogException if the registration of the catalog under the given name failed + */ + void storeCatalog(String catalogName, CatalogDescriptor catalog) throws CatalogException; + + /** + * Unregisters a catalog under the given name. The catalog name must be existed. Review Comment: Remove a catalog with the given catalog name. ########## flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/CatalogStore.java: ########## @@ -0,0 +1,80 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.catalog; + +import org.apache.flink.annotation.PublicEvolving; +import org.apache.flink.table.catalog.exceptions.CatalogException; + +import java.util.Optional; +import java.util.Set; + +/** + * This interface describes the behavior of retrieving, saving, or deleting Catalog configurations + * from an external storage system. + */ +@PublicEvolving +public interface CatalogStore { + + /** + * Store a catalog under the give name. The catalog name must be unique. + * + * @param catalogName name under which to register the given catalog + * @param catalog catalog instance to store + * @throws CatalogException if the registration of the catalog under the given name failed + */ + void storeCatalog(String catalogName, CatalogDescriptor catalog) throws CatalogException; + + /** + * Unregisters a catalog under the given name. The catalog name must be existed. + * + * @param catalogName name under which to unregister the given catalog. + * @param ignoreIfNotExists If false exception will be thrown if the table or database or + * catalog to be altered does not exist. + * @throws CatalogException if the unregistration of the catalog under the given name failed Review Comment: throw when the removal operation failed ########## flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/CatalogStore.java: ########## @@ -0,0 +1,80 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.catalog; + +import org.apache.flink.annotation.PublicEvolving; +import org.apache.flink.table.catalog.exceptions.CatalogException; + +import java.util.Optional; +import java.util.Set; + +/** + * This interface describes the behavior of retrieving, saving, or deleting Catalog configurations + * from an external storage system. + */ Review Comment: ```java /** * The {@link CatalogStore} is used in {@link CatalogManager} to retrieve, save and remove {@link CatalogDescriptor} at the * external storage system. */ ``` ########## flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/CatalogStore.java: ########## @@ -0,0 +1,80 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.catalog; + +import org.apache.flink.annotation.PublicEvolving; +import org.apache.flink.table.catalog.exceptions.CatalogException; + +import java.util.Optional; +import java.util.Set; + +/** + * This interface describes the behavior of retrieving, saving, or deleting Catalog configurations + * from an external storage system. + */ +@PublicEvolving +public interface CatalogStore { + + /** + * Store a catalog under the give name. The catalog name must be unique. + * + * @param catalogName name under which to register the given catalog + * @param catalog catalog instance to store + * @throws CatalogException if the registration of the catalog under the given name failed + */ + void storeCatalog(String catalogName, CatalogDescriptor catalog) throws CatalogException; + + /** + * Unregisters a catalog under the given name. The catalog name must be existed. + * + * @param catalogName name under which to unregister the given catalog. + * @param ignoreIfNotExists If false exception will be thrown if the table or database or + * catalog to be altered does not exist. Review Comment: whether throw an exception when the catalog does not exist. ########## flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/CatalogStore.java: ########## @@ -0,0 +1,80 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.catalog; + +import org.apache.flink.annotation.PublicEvolving; +import org.apache.flink.table.catalog.exceptions.CatalogException; + +import java.util.Optional; +import java.util.Set; + +/** + * This interface describes the behavior of retrieving, saving, or deleting Catalog configurations + * from an external storage system. + */ +@PublicEvolving +public interface CatalogStore { + + /** + * Store a catalog under the give name. The catalog name must be unique. + * + * @param catalogName name under which to register the given catalog Review Comment: the given catalog name ########## flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/CatalogStore.java: ########## @@ -0,0 +1,80 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.catalog; + +import org.apache.flink.annotation.PublicEvolving; +import org.apache.flink.table.catalog.exceptions.CatalogException; + +import java.util.Optional; +import java.util.Set; + +/** + * This interface describes the behavior of retrieving, saving, or deleting Catalog configurations + * from an external storage system. + */ +@PublicEvolving +public interface CatalogStore { + + /** + * Store a catalog under the give name. The catalog name must be unique. + * + * @param catalogName name under which to register the given catalog + * @param catalog catalog instance to store + * @throws CatalogException if the registration of the catalog under the given name failed + */ + void storeCatalog(String catalogName, CatalogDescriptor catalog) throws CatalogException; + + /** + * Unregisters a catalog under the given name. The catalog name must be existed. + * + * @param catalogName name under which to unregister the given catalog. + * @param ignoreIfNotExists If false exception will be thrown if the table or database or + * catalog to be altered does not exist. + * @throws CatalogException if the unregistration of the catalog under the given name failed + */ + void removeCatalog(String catalogName, boolean ignoreIfNotExists) throws CatalogException; + + /** + * Gets a catalog by name. + * + * @param catalogName name of the catalog to retrieve + * @return the requested catalog or empty if it does not exist Review Comment: if the catalog does not exist ########## flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/CatalogStore.java: ########## @@ -0,0 +1,80 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.catalog; + +import org.apache.flink.annotation.PublicEvolving; +import org.apache.flink.table.catalog.exceptions.CatalogException; + +import java.util.Optional; +import java.util.Set; + +/** + * This interface describes the behavior of retrieving, saving, or deleting Catalog configurations + * from an external storage system. + */ +@PublicEvolving +public interface CatalogStore { + + /** + * Store a catalog under the give name. The catalog name must be unique. + * + * @param catalogName name under which to register the given catalog + * @param catalog catalog instance to store + * @throws CatalogException if the registration of the catalog under the given name failed + */ + void storeCatalog(String catalogName, CatalogDescriptor catalog) throws CatalogException; + + /** + * Unregisters a catalog under the given name. The catalog name must be existed. + * + * @param catalogName name under which to unregister the given catalog. + * @param ignoreIfNotExists If false exception will be thrown if the table or database or + * catalog to be altered does not exist. + * @throws CatalogException if the unregistration of the catalog under the given name failed + */ + void removeCatalog(String catalogName, boolean ignoreIfNotExists) throws CatalogException; + + /** + * Gets a catalog by name. Review Comment: Get ########## flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/CatalogStore.java: ########## @@ -0,0 +1,80 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.catalog; + +import org.apache.flink.annotation.PublicEvolving; +import org.apache.flink.table.catalog.exceptions.CatalogException; + +import java.util.Optional; +import java.util.Set; + +/** + * This interface describes the behavior of retrieving, saving, or deleting Catalog configurations + * from an external storage system. + */ +@PublicEvolving +public interface CatalogStore { + + /** + * Store a catalog under the give name. The catalog name must be unique. + * + * @param catalogName name under which to register the given catalog + * @param catalog catalog instance to store + * @throws CatalogException if the registration of the catalog under the given name failed + */ + void storeCatalog(String catalogName, CatalogDescriptor catalog) throws CatalogException; + + /** + * Unregisters a catalog under the given name. The catalog name must be existed. + * + * @param catalogName name under which to unregister the given catalog. + * @param ignoreIfNotExists If false exception will be thrown if the table or database or + * catalog to be altered does not exist. + * @throws CatalogException if the unregistration of the catalog under the given name failed + */ + void removeCatalog(String catalogName, boolean ignoreIfNotExists) throws CatalogException; + + /** + * Gets a catalog by name. + * + * @param catalogName name of the catalog to retrieve + * @return the requested catalog or empty if it does not exist + */ + Optional<CatalogDescriptor> getCatalog(String catalogName); + + /** + * Retrieves names of all registered catalogs. + * + * @return a set of names of registered catalogs + */ + Set<String> listCatalogs(); + + /** + * Check if there is a corresponding catalog with the given name in CatalogStore. Review Comment: Return whether the catalog exists in the catalog store. ########## flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/GenericInMemoryCatalogStoreFactoryOptions.java: ########## @@ -0,0 +1,26 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.catalog; + +import org.apache.flink.configuration.ConfigOption; + +/** {@link ConfigOption}s for {@link GenericInMemoryCatalogStore}. */ +public class GenericInMemoryCatalogStoreFactoryOptions { + public static final String IDENTIFIER = "generic_in_memory"; Review Comment: private GenericInMemoryCatalogStoreFactoryOptions() {} ########## flink-table/flink-table-common/src/main/java/org/apache/flink/table/factories/CatalogStoreFactory.java: ########## @@ -0,0 +1,107 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.factories; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.annotation.PublicEvolving; +import org.apache.flink.configuration.ReadableConfig; +import org.apache.flink.table.catalog.CatalogStore; + +import java.util.Map; + +/** + * A factory to create configured catalog store instances based on string-based properties. See also + * {@link Factory} for more information. + */ +@PublicEvolving +public interface CatalogStoreFactory extends Factory { + + /** Creates a {@link CatalogStore} instance from context information. */ + CatalogStore createCatalogStore(Context context); + + /** Initialization method for the CatalogStoreFactory. */ + void open(Context context); + + /** Tear-down method for the CatalogStoreFactory. */ + void close(); + + /** Context provided when a catalog store is created. */ + @PublicEvolving + interface Context { + + /** + * Returns the options with which the catalog store is created. + * + * <p>An implementation should perform validation of these options. + */ + Map<String, String> getOptions(); + + /** Gives read-only access to the configuration of the current session. */ + ReadableConfig getConfiguration(); + + /** + * Returns the class loader of the current session. + * + * <p>The class loader is in particular useful for discovering further (nested) factories. + */ + ClassLoader getClassLoader(); + } + + /** Default implementation of {@link CatalogStoreFactory.Context}. */ + @Internal + class DefaultCatalogStoreContext implements CatalogStoreFactory.Context { Review Comment: Maybe this default implementation should be put at `FactoryUtil`? ########## flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/CatalogStore.java: ########## @@ -0,0 +1,80 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.catalog; + +import org.apache.flink.annotation.PublicEvolving; +import org.apache.flink.table.catalog.exceptions.CatalogException; + +import java.util.Optional; +import java.util.Set; + +/** + * This interface describes the behavior of retrieving, saving, or deleting Catalog configurations + * from an external storage system. + */ +@PublicEvolving +public interface CatalogStore { + + /** + * Store a catalog under the give name. The catalog name must be unique. + * + * @param catalogName name under which to register the given catalog + * @param catalog catalog instance to store + * @throws CatalogException if the registration of the catalog under the given name failed + */ + void storeCatalog(String catalogName, CatalogDescriptor catalog) throws CatalogException; + + /** + * Unregisters a catalog under the given name. The catalog name must be existed. + * + * @param catalogName name under which to unregister the given catalog. + * @param ignoreIfNotExists If false exception will be thrown if the table or database or + * catalog to be altered does not exist. + * @throws CatalogException if the unregistration of the catalog under the given name failed + */ + void removeCatalog(String catalogName, boolean ignoreIfNotExists) throws CatalogException; + + /** + * Gets a catalog by name. + * + * @param catalogName name of the catalog to retrieve + * @return the requested catalog or empty if it does not exist + */ + Optional<CatalogDescriptor> getCatalog(String catalogName); + + /** + * Retrieves names of all registered catalogs. + * + * @return a set of names of registered catalogs Review Comment: the names of registered catalogs ########## flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/CatalogStore.java: ########## @@ -0,0 +1,80 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.catalog; + +import org.apache.flink.annotation.PublicEvolving; +import org.apache.flink.table.catalog.exceptions.CatalogException; + +import java.util.Optional; +import java.util.Set; + +/** + * This interface describes the behavior of retrieving, saving, or deleting Catalog configurations + * from an external storage system. + */ +@PublicEvolving +public interface CatalogStore { + + /** + * Store a catalog under the give name. The catalog name must be unique. + * + * @param catalogName name under which to register the given catalog + * @param catalog catalog instance to store + * @throws CatalogException if the registration of the catalog under the given name failed + */ + void storeCatalog(String catalogName, CatalogDescriptor catalog) throws CatalogException; + + /** + * Unregisters a catalog under the given name. The catalog name must be existed. + * + * @param catalogName name under which to unregister the given catalog. + * @param ignoreIfNotExists If false exception will be thrown if the table or database or + * catalog to be altered does not exist. + * @throws CatalogException if the unregistration of the catalog under the given name failed + */ + void removeCatalog(String catalogName, boolean ignoreIfNotExists) throws CatalogException; + + /** + * Gets a catalog by name. + * + * @param catalogName name of the catalog to retrieve + * @return the requested catalog or empty if it does not exist + */ + Optional<CatalogDescriptor> getCatalog(String catalogName); + + /** + * Retrieves names of all registered catalogs. + * + * @return a set of names of registered catalogs + */ + Set<String> listCatalogs(); + + /** + * Check if there is a corresponding catalog with the given name in CatalogStore. + * + * @return whether there is a corresponding Catalog with the given name + */ + boolean contains(String catalogName); + + /** Initialization method for the CatalogStore. */ Review Comment: Initialize the catalog store. ########## flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/CatalogStore.java: ########## @@ -0,0 +1,80 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.catalog; + +import org.apache.flink.annotation.PublicEvolving; +import org.apache.flink.table.catalog.exceptions.CatalogException; + +import java.util.Optional; +import java.util.Set; + +/** + * This interface describes the behavior of retrieving, saving, or deleting Catalog configurations + * from an external storage system. + */ +@PublicEvolving +public interface CatalogStore { + + /** + * Store a catalog under the give name. The catalog name must be unique. + * + * @param catalogName name under which to register the given catalog + * @param catalog catalog instance to store + * @throws CatalogException if the registration of the catalog under the given name failed + */ + void storeCatalog(String catalogName, CatalogDescriptor catalog) throws CatalogException; + + /** + * Unregisters a catalog under the given name. The catalog name must be existed. + * + * @param catalogName name under which to unregister the given catalog. + * @param ignoreIfNotExists If false exception will be thrown if the table or database or + * catalog to be altered does not exist. + * @throws CatalogException if the unregistration of the catalog under the given name failed + */ + void removeCatalog(String catalogName, boolean ignoreIfNotExists) throws CatalogException; + + /** + * Gets a catalog by name. + * + * @param catalogName name of the catalog to retrieve + * @return the requested catalog or empty if it does not exist + */ + Optional<CatalogDescriptor> getCatalog(String catalogName); + + /** + * Retrieves names of all registered catalogs. + * + * @return a set of names of registered catalogs + */ + Set<String> listCatalogs(); + + /** + * Check if there is a corresponding catalog with the given name in CatalogStore. + * + * @return whether there is a corresponding Catalog with the given name Review Comment: Remove this `@reutrn`. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org