Hi everyone,

The original discussion address is
https://issues.apache.org/jira/browse/FLINK-30126

Currently, Flink has access to many systems, including kafka, hive,
iceberg, hudi, elasticsearch, mysql...  The corresponding catalog name
might be:
kafka_cluster1, kafka_cluster2, hive_cluster1, hive_cluster2,
iceberg_cluster2, elasticsearch_cluster1,  mysql_database1_xxx,
mysql_database2_xxxx

As the platform of the Flink SQL job, we need to maintain the meta
information of each system of the company, and when the Flink job
starts, we need to register the catalog with the Flink table
environment, so that users can use any table through the
env.executeSql interface.

When we only have a small number of catalogs, we can register like
this, but when there are thousands of catalogs, I think that there
needs to be a dynamic loading mechanism that we can register catalog
when needed, speed up the initialization of the table environment, and
avoid the useless catalog registration process.

Preliminary thoughts:

A new CatalogProvider interface can be added:
It contains two interfaces:
* listCatalogs() interface, which can list all the interfaces that the
interface can provide
* getCatalog() interface,  which can get a catalog instance by catalog name.

```java
public interface CatalogProvider {

    default void initialize(ClassLoader classLoader, ReadableConfig config) {}

    Optional<Catalog> getCatalog(String catalogName);

    Set<String> listCatalogs();
}
```


The corresponding implementation in CatalogManager is as follows:

```java
public CatalogManager {
    private @Nullable CatalogProvider catalogProvider;

    private Map<String, Catalog> catalogs;

    public void setCatalogProvider(CatalogProvider catalogProvider) {
        this.catalogProvider = catalogProvider;
    }

    public Optional<Catalog> getCatalog(String catalogName) {
        // If there is no corresponding catalog in catalogs,
        // get catalog by catalogProvider
        if (catalogProvider != null) {
            Optional<Catalog> catalog = catalogProvider.getCatalog(catalogName);
        }
    }

}
```



Possible problems:

1. Catalog name conflict, how to choose when the registered catalog
and the catalog provided by catalog-provider conflict?
I prefer tableEnv-registered ones over catalogs provided by the
catalog-provider. If the user wishes to reference the catalog provided
by the catalog-provider, they can unregister the catalog in tableEnv
through the `unregisterCatalog` interface.

2. Number of CatalogProviders, is it possible to have multiple
catalogProvider implementations?
I don't have a good idea of this at the moment. If multiple
catalogProviders are supported, it brings much more convenience, But
there may be catalog name conflicts between different
catalogProviders.



Looking forward to your reply, any feedback is appreciated!


Best.

Feng Jin

Reply via email to