Hi Feng,

this is indeed a good proposal.

1) It makes sense to improve the catalog listing for platform providers.

2) Other feedback from the past has shown that users would like to avoid the default in-memory catalog and offer their catalog before a TableEnvironment session starts.

3) Also we might reconsider whether a default catalog and default database make sense. Or whether this can be disabled and SHOW CATALOGS can be used for listing first without having a default catalog.

What do you think about option 2 and 3?

In any case, I would propose we pass a CatalogProvider to EnvironmentSettings and only allow a single instance. Catalogs should never shadow other catalogs.

We could also use the org.apache.flink.table.factories.Factory infra and allow catalog providers via pure string properties. Not sure if we need this in the first version though.

Cheers,
Timo


On 06.02.23 11:21, Feng Jin wrote:
Hi everyone,

The original discussion address is
https://issues.apache.org/jira/browse/FLINK-30126

Currently, Flink has access to many systems, including kafka, hive,
iceberg, hudi, elasticsearch, mysql...  The corresponding catalog name
might be:
kafka_cluster1, kafka_cluster2, hive_cluster1, hive_cluster2,
iceberg_cluster2, elasticsearch_cluster1,  mysql_database1_xxx,
mysql_database2_xxxx

As the platform of the Flink SQL job, we need to maintain the meta
information of each system of the company, and when the Flink job
starts, we need to register the catalog with the Flink table
environment, so that users can use any table through the
env.executeSql interface.

When we only have a small number of catalogs, we can register like
this, but when there are thousands of catalogs, I think that there
needs to be a dynamic loading mechanism that we can register catalog
when needed, speed up the initialization of the table environment, and
avoid the useless catalog registration process.

Preliminary thoughts:

A new CatalogProvider interface can be added:
It contains two interfaces:
* listCatalogs() interface, which can list all the interfaces that the
interface can provide
* getCatalog() interface,  which can get a catalog instance by catalog name.

```java
public interface CatalogProvider {

     default void initialize(ClassLoader classLoader, ReadableConfig config) {}

     Optional<Catalog> getCatalog(String catalogName);

     Set<String> listCatalogs();
}
```


The corresponding implementation in CatalogManager is as follows:

```java
public CatalogManager {
     private @Nullable CatalogProvider catalogProvider;

     private Map<String, Catalog> catalogs;

     public void setCatalogProvider(CatalogProvider catalogProvider) {
         this.catalogProvider = catalogProvider;
     }

     public Optional<Catalog> getCatalog(String catalogName) {
         // If there is no corresponding catalog in catalogs,
         // get catalog by catalogProvider
         if (catalogProvider != null) {
             Optional<Catalog> catalog = 
catalogProvider.getCatalog(catalogName);
         }
     }

}
```



Possible problems:

1. Catalog name conflict, how to choose when the registered catalog
and the catalog provided by catalog-provider conflict?
I prefer tableEnv-registered ones over catalogs provided by the
catalog-provider. If the user wishes to reference the catalog provided
by the catalog-provider, they can unregister the catalog in tableEnv
through the `unregisterCatalog` interface.

2. Number of CatalogProviders, is it possible to have multiple
catalogProvider implementations?
I don't have a good idea of this at the moment. If multiple
catalogProviders are supported, it brings much more convenience, But
there may be catalog name conflicts between different
catalogProviders.



Looking forward to your reply, any feedback is appreciated!


Best.

Feng Jin


Reply via email to