Thank you Feng, Feel free to start a FLIP proposal if you are interested. Looking forward to it!
Best, Jark > 2023年2月10日 15:44,Feng Jin <jinfeng1...@gmail.com> 写道: > > @Shengkai >> About the catalog jar hot updates > > Currently we do not have a similar requirement, but if the catalog > management interface is opened, this can indeed realize the hot > loading of the catalog jar > > >> do we need to instantiate the Catalog immediately or defer to the usage > > I think this can be the same as before . > > > > @Jark >> There only can be a single catalog manager in TableEnvironment. > > big +1 for this. This can avoid conflicts and also meet the catalog > persistence requirements. > > > Best, > Feng > > On Fri, Feb 10, 2023 at 3:09 PM Jark Wu <imj...@gmail.com> wrote: >> >> Hi Feng, >> >> It's still easy to conflict and be inconsistent even if we have only one >> CatalogProvider, because CatalogProvider only provides readable interfaces >> (listCatalogs, getCatalog). For example, you may register a catalog X, but >> can't list it because it's not in the external metadata service. >> >> To avoid catalog conflicts and keep consistent, we can extract the catalog >> management logic as a pluggable interface, including listCatalog, >> getCatalog, registerCatalog, unregisterCatalog, etc. The >> current CatalogManager is a default in-memory implementation, you can >> replace it with user-defined managers, such as >> - file-based: which manages catalog information on local files, just like >> how Presto/Trino manages catalogs >> - metaservice-based: which manages catalog information on external >> metadata service. >> >> There only can be a single catalog manager in TableEnvironment. This >> guarantees data consistency and avoids conflicts. This approach can address >> another pain point of Flink SQL: the catalog information is not persisted. >> >> Can this approach satisfy your requirements? >> >> Best, >> Jark >> >> >> >> >> >> On Fri, 10 Feb 2023 at 11:21, Shengkai Fang <fskm...@gmail.com> wrote: >> >>> Hi Feng. >>> >>> I think your idea is very interesting! >>> >>> 1. I just wonder after initializing the Catalog, will the Session reuse the >>> same Catalog instance or build a new one for later usage? If we reuse the >>> same Catalog, I think it's more like lazy initialization. I am a >>> little prone to rebuild a new one because it's easier for us to catalog jar >>> hot updates. >>> >>> 2. Users use the `CREATE CATALOG` statement in the CatalogManager. In this >>> case, do we need to instantiate the Catalog immediately or defer to the >>> usage? >>> >>> Best, >>> Shengkai >>> >>> Feng Jin <jinfeng1...@gmail.com> 于2023年2月9日周四 20:13写道: >>> >>>> Thanks for your reply. >>>> >>>> @Timo >>>> >>>>> 2) avoid the default in-memory catalog and offer their catalog before >>>> a TableEnvironment session starts >>>>> 3) whether this can be disabled and SHOW CATALOGS can be used for >>>> listing first without having a default catalog. >>>> >>>> >>>> Regarding 2 and 3, I think this problem can be solved by introducing >>>> catalog providers, and users can control some default catalog >>>> behavior. >>>> >>>> >>>>> We could also use the org.apache.flink.table.factories.Factory infra >>>> and allow catalog providers via pure string properties >>>> >>>> I think this is also very useful. In our usage scenarios, it is >>>> usually multi-cluster management, and it is also necessary to pass >>>> different configurations through parameters. >>>> >>>> >>>> @Jark @Huang >>>> >>>>> About the lazy catalog initialization >>>> >>>> Our needs may be different. If these properties already exist in an >>>> external system, especially when there may be thousands of these >>>> catalog properties, I don’t think it is necessary to register all >>>> these properties in the Flink env at startup, but we need is that we >>>> can register a catalog when it needs and we can get the properties >>>> from the external meta system . >>>> >>>> >>>>> It may be hard to avoid conflicts and duplicates between >>>> CatalogProvider and CatalogManager >>>> >>>> It is indeed easy to conflict. My idea is that if we separate the >>>> catalog management of the current CatalogManager as the default >>>> CatalogProvider behavior, at the same time, only one CatalogProvider >>>> exists in a Flink Env. This may avoid catalog conflicts. >>>> >>>> >>>> Best, >>>> Feng >>>> >>>> On Tue, Feb 7, 2023 at 1:01 PM Hang Ruan <ruanhang1...@gmail.com> wrote: >>>>> >>>>> Hi Feng, >>>>> I agree with what Jark said. I think what you are looking for is lazy >>>>> initialization. >>>>> >>>>> I don't think we should introduce the new interface CatalogProvider for >>>>> lazy initialization. What we should do is to store the catalog >>> properties >>>>> and initialize the catalog when we need it. Could you please introduce >>>> some >>>>> other scenarios that we need the CatalogProvider besides the lazy >>>>> initialization? >>>>> >>>>> If we really need the CatalogProvider, I think it is better to be a >>>> single >>>>> instance. Multiple instances are difficult to manage and there are name >>>>> conflicts among providers. >>>>> >>>>> Best, >>>>> Hang >>>>> >>>>> Jark Wu <imj...@gmail.com> 于2023年2月7日周二 10:48写道: >>>>> >>>>>> Hi Feng, >>>>>> >>>>>> I think this feature makes a lot of sense. If I understand correctly, >>>> what >>>>>> you are looking for is lazy catalog initialization. >>>>>> >>>>>> However, I have some concerns about introducing CatalogProvider, >>> which >>>>>> delegates catalog management to users. It may be hard to avoid >>>> conflicts >>>>>> and duplicates between CatalogProvider and CatalogManager. Is it >>>> possible >>>>>> to have a built-in CatalogProvider to instantiate catalogs lazily? >>>>>> >>>>>> An idea in my mind is to introduce another catalog registration API >>>>>> without instantiating the catalog, e.g., registerCatalog(String >>>>>> catalogName, Map<String, String> catalogProperties). The catalog >>>>>> information is stored in CatalogManager as pure strings. The catalog >>> is >>>>>> instantiated and initialized when used. >>>>>> >>>>>> This new API is very similar to other pure-string metadata >>>> registration, >>>>>> such as "createTable(String path, TableDescriptor descriptor)" and >>>>>> "createFunction(String path, String className, List<ResourceUri> >>>>>> resourceUris)". >>>>>> >>>>>> Can this approach satisfy your requirement? >>>>>> >>>>>> Best, >>>>>> Jark >>>>>> >>>>>> On Mon, 6 Feb 2023 at 22:53, Timo Walther <twal...@apache.org> >>> wrote: >>>>>> >>>>>>> Hi Feng, >>>>>>> >>>>>>> this is indeed a good proposal. >>>>>>> >>>>>>> 1) It makes sense to improve the catalog listing for platform >>>> providers. >>>>>>> >>>>>>> 2) Other feedback from the past has shown that users would like to >>>> avoid >>>>>>> the default in-memory catalog and offer their catalog before a >>>>>>> TableEnvironment session starts. >>>>>>> >>>>>>> 3) Also we might reconsider whether a default catalog and default >>>>>>> database make sense. Or whether this can be disabled and SHOW >>>> CATALOGS >>>>>>> can be used for listing first without having a default catalog. >>>>>>> >>>>>>> What do you think about option 2 and 3? >>>>>>> >>>>>>> In any case, I would propose we pass a CatalogProvider to >>>>>>> EnvironmentSettings and only allow a single instance. Catalogs >>> should >>>>>>> never shadow other catalogs. >>>>>>> >>>>>>> We could also use the org.apache.flink.table.factories.Factory >>> infra >>>> and >>>>>>> allow catalog providers via pure string properties. Not sure if we >>>> need >>>>>>> this in the first version though. >>>>>>> >>>>>>> Cheers, >>>>>>> Timo >>>>>>> >>>>>>> >>>>>>> On 06.02.23 11:21, Feng Jin wrote: >>>>>>>> Hi everyone, >>>>>>>> >>>>>>>> The original discussion address is >>>>>>>> https://issues.apache.org/jira/browse/FLINK-30126 >>>>>>>> >>>>>>>> Currently, Flink has access to many systems, including kafka, >>> hive, >>>>>>>> iceberg, hudi, elasticsearch, mysql... The corresponding catalog >>>> name >>>>>>>> might be: >>>>>>>> kafka_cluster1, kafka_cluster2, hive_cluster1, hive_cluster2, >>>>>>>> iceberg_cluster2, elasticsearch_cluster1, mysql_database1_xxx, >>>>>>>> mysql_database2_xxxx >>>>>>>> >>>>>>>> As the platform of the Flink SQL job, we need to maintain the >>> meta >>>>>>>> information of each system of the company, and when the Flink job >>>>>>>> starts, we need to register the catalog with the Flink table >>>>>>>> environment, so that users can use any table through the >>>>>>>> env.executeSql interface. >>>>>>>> >>>>>>>> When we only have a small number of catalogs, we can register >>> like >>>>>>>> this, but when there are thousands of catalogs, I think that >>> there >>>>>>>> needs to be a dynamic loading mechanism that we can register >>>> catalog >>>>>>>> when needed, speed up the initialization of the table >>> environment, >>>> and >>>>>>>> avoid the useless catalog registration process. >>>>>>>> >>>>>>>> Preliminary thoughts: >>>>>>>> >>>>>>>> A new CatalogProvider interface can be added: >>>>>>>> It contains two interfaces: >>>>>>>> * listCatalogs() interface, which can list all the interfaces >>> that >>>> the >>>>>>>> interface can provide >>>>>>>> * getCatalog() interface, which can get a catalog instance by >>>> catalog >>>>>>> name. >>>>>>>> >>>>>>>> ```java >>>>>>>> public interface CatalogProvider { >>>>>>>> >>>>>>>> default void initialize(ClassLoader classLoader, >>>> ReadableConfig >>>>>>> config) {} >>>>>>>> >>>>>>>> Optional<Catalog> getCatalog(String catalogName); >>>>>>>> >>>>>>>> Set<String> listCatalogs(); >>>>>>>> } >>>>>>>> ``` >>>>>>>> >>>>>>>> >>>>>>>> The corresponding implementation in CatalogManager is as follows: >>>>>>>> >>>>>>>> ```java >>>>>>>> public CatalogManager { >>>>>>>> private @Nullable CatalogProvider catalogProvider; >>>>>>>> >>>>>>>> private Map<String, Catalog> catalogs; >>>>>>>> >>>>>>>> public void setCatalogProvider(CatalogProvider >>>> catalogProvider) { >>>>>>>> this.catalogProvider = catalogProvider; >>>>>>>> } >>>>>>>> >>>>>>>> public Optional<Catalog> getCatalog(String catalogName) { >>>>>>>> // If there is no corresponding catalog in catalogs, >>>>>>>> // get catalog by catalogProvider >>>>>>>> if (catalogProvider != null) { >>>>>>>> Optional<Catalog> catalog = >>>>>>> catalogProvider.getCatalog(catalogName); >>>>>>>> } >>>>>>>> } >>>>>>>> >>>>>>>> } >>>>>>>> ``` >>>>>>>> >>>>>>>> >>>>>>>> >>>>>>>> Possible problems: >>>>>>>> >>>>>>>> 1. Catalog name conflict, how to choose when the registered >>> catalog >>>>>>>> and the catalog provided by catalog-provider conflict? >>>>>>>> I prefer tableEnv-registered ones over catalogs provided by the >>>>>>>> catalog-provider. If the user wishes to reference the catalog >>>> provided >>>>>>>> by the catalog-provider, they can unregister the catalog in >>>> tableEnv >>>>>>>> through the `unregisterCatalog` interface. >>>>>>>> >>>>>>>> 2. Number of CatalogProviders, is it possible to have multiple >>>>>>>> catalogProvider implementations? >>>>>>>> I don't have a good idea of this at the moment. If multiple >>>>>>>> catalogProviders are supported, it brings much more convenience, >>>> But >>>>>>>> there may be catalog name conflicts between different >>>>>>>> catalogProviders. >>>>>>>> >>>>>>>> >>>>>>>> >>>>>>>> Looking forward to your reply, any feedback is appreciated! >>>>>>>> >>>>>>>> >>>>>>>> Best. >>>>>>>> >>>>>>>> Feng Jin >>>>>>>> >>>>>>> >>>>>>> >>>>>> >>>> >>>