Thank you Feng, 

Feel free to start a FLIP proposal if you are interested. Looking forward to it!

Best,
Jark

> 2023年2月10日 15:44,Feng Jin <jinfeng1...@gmail.com> 写道:
> 
> @Shengkai
>> About the catalog jar hot updates
> 
> Currently we do not have a similar requirement, but if the catalog
> management interface is opened, this can indeed realize the hot
> loading of the catalog jar
> 
> 
>> do we need to instantiate the Catalog immediately or defer to the usage
> 
> I think this can be the same as before .
> 
> 
> 
> @Jark
>> There only can be a single catalog manager in TableEnvironment.
> 
> big +1 for this.  This can avoid conflicts and also meet the catalog
> persistence requirements.
> 
> 
> Best,
> Feng
> 
> On Fri, Feb 10, 2023 at 3:09 PM Jark Wu <imj...@gmail.com> wrote:
>> 
>> Hi Feng,
>> 
>> It's still easy to conflict and be inconsistent even if we have only one
>> CatalogProvider, because CatalogProvider only provides readable interfaces
>> (listCatalogs, getCatalog). For example, you may register a catalog X, but
>> can't list it because it's not in the external metadata service.
>> 
>> To avoid catalog conflicts and keep consistent, we can extract the catalog
>> management logic as a pluggable interface, including listCatalog,
>> getCatalog, registerCatalog, unregisterCatalog, etc. The
>> current CatalogManager is a default in-memory implementation, you can
>> replace it with user-defined managers, such as
>> - file-based: which manages catalog information on local files, just like
>> how Presto/Trino manages catalogs
>> - metaservice-based: which manages catalog information on external
>> metadata service.
>> 
>> There only can be a single catalog manager in TableEnvironment. This
>> guarantees data consistency and avoids conflicts. This approach can address
>> another pain point of Flink SQL: the catalog information is not persisted.
>> 
>> Can this approach satisfy your requirements?
>> 
>> Best,
>> Jark
>> 
>> 
>> 
>> 
>> 
>> On Fri, 10 Feb 2023 at 11:21, Shengkai Fang <fskm...@gmail.com> wrote:
>> 
>>> Hi Feng.
>>> 
>>> I think your idea is very interesting!
>>> 
>>> 1. I just wonder after initializing the Catalog, will the Session reuse the
>>> same Catalog instance or build a new one for later usage? If we reuse the
>>> same Catalog, I think it's more like lazy initialization. I am a
>>> little prone to rebuild a new one because it's easier for us to catalog jar
>>> hot updates.
>>> 
>>> 2. Users use the `CREATE CATALOG` statement in the CatalogManager. In this
>>> case, do we need to instantiate the Catalog immediately or defer to the
>>> usage?
>>> 
>>> Best,
>>> Shengkai
>>> 
>>> Feng Jin <jinfeng1...@gmail.com> 于2023年2月9日周四 20:13写道:
>>> 
>>>> Thanks for your reply.
>>>> 
>>>> @Timo
>>>> 
>>>>> 2) avoid  the default in-memory catalog and offer their catalog before
>>>> a  TableEnvironment session starts
>>>>> 3) whether this can be disabled and SHOW CATALOGS  can be used for
>>>> listing first without having a default catalog.
>>>> 
>>>> 
>>>> Regarding 2 and 3, I think this problem can be solved by introducing
>>>> catalog providers, and users can control some default catalog
>>>> behavior.
>>>> 
>>>> 
>>>>> We could also use the org.apache.flink.table.factories.Factory infra
>>>> and  allow catalog providers via pure string properties
>>>> 
>>>> I think this is also very useful. In our usage scenarios, it is
>>>> usually multi-cluster management, and it is also necessary to pass
>>>> different configurations through parameters.
>>>> 
>>>> 
>>>> @Jark @Huang
>>>> 
>>>>> About the lazy catalog initialization
>>>> 
>>>> Our needs may be different. If these properties already exist in an
>>>> external system, especially when there may be thousands of these
>>>> catalog properties, I don’t think it is necessary to register all
>>>> these properties in the Flink env at startup, but we need is that we
>>>> can register a catalog  when it needs and we can get the properties
>>>> from the external meta system .
>>>> 
>>>> 
>>>>> It may be hard to avoid conflicts  and duplicates between
>>>> CatalogProvider and CatalogManager
>>>> 
>>>> It is indeed easy to conflict. My idea is that if we separate the
>>>> catalog management of the current CatalogManager as the default
>>>> CatalogProvider behavior, at the same time, only one CatalogProvider
>>>> exists in a Flink Env.  This may avoid catalog conflicts.
>>>> 
>>>> 
>>>> Best,
>>>> Feng
>>>> 
>>>> On Tue, Feb 7, 2023 at 1:01 PM Hang Ruan <ruanhang1...@gmail.com> wrote:
>>>>> 
>>>>> Hi Feng,
>>>>> I agree with what Jark said. I think what you are looking for is lazy
>>>>> initialization.
>>>>> 
>>>>> I don't think we should introduce the new interface CatalogProvider for
>>>>> lazy initialization. What we should do is to store the catalog
>>> properties
>>>>> and initialize the catalog when we need it. Could you please introduce
>>>> some
>>>>> other scenarios that we need the CatalogProvider besides the lazy
>>>>> initialization?
>>>>> 
>>>>> If we really need the CatalogProvider, I think it is better to be a
>>>> single
>>>>> instance. Multiple instances are difficult to manage and there are name
>>>>> conflicts among providers.
>>>>> 
>>>>> Best,
>>>>> Hang
>>>>> 
>>>>> Jark Wu <imj...@gmail.com> 于2023年2月7日周二 10:48写道:
>>>>> 
>>>>>> Hi Feng,
>>>>>> 
>>>>>> I think this feature makes a lot of sense. If I understand correctly,
>>>> what
>>>>>> you are looking for is lazy catalog initialization.
>>>>>> 
>>>>>> However, I have some concerns about introducing CatalogProvider,
>>> which
>>>>>> delegates catalog management to users. It may be hard to avoid
>>>> conflicts
>>>>>> and duplicates between CatalogProvider and CatalogManager. Is it
>>>> possible
>>>>>> to have a built-in CatalogProvider to instantiate catalogs lazily?
>>>>>> 
>>>>>> An idea in my mind is to introduce another catalog registration API
>>>>>> without instantiating the catalog, e.g., registerCatalog(String
>>>>>> catalogName, Map<String, String> catalogProperties). The catalog
>>>>>> information is stored in CatalogManager as pure strings. The catalog
>>> is
>>>>>> instantiated and initialized when used.
>>>>>> 
>>>>>> This new API is very similar to other pure-string metadata
>>>> registration,
>>>>>> such as "createTable(String path, TableDescriptor descriptor)" and
>>>>>> "createFunction(String path, String className, List<ResourceUri>
>>>>>> resourceUris)".
>>>>>> 
>>>>>> Can this approach satisfy your requirement?
>>>>>> 
>>>>>> Best,
>>>>>> Jark
>>>>>> 
>>>>>> On Mon, 6 Feb 2023 at 22:53, Timo Walther <twal...@apache.org>
>>> wrote:
>>>>>> 
>>>>>>> Hi Feng,
>>>>>>> 
>>>>>>> this is indeed a good proposal.
>>>>>>> 
>>>>>>> 1) It makes sense to improve the catalog listing for platform
>>>> providers.
>>>>>>> 
>>>>>>> 2) Other feedback from the past has shown that users would like to
>>>> avoid
>>>>>>> the default in-memory catalog and offer their catalog before a
>>>>>>> TableEnvironment session starts.
>>>>>>> 
>>>>>>> 3) Also we might reconsider whether a default catalog and default
>>>>>>> database make sense. Or whether this can be disabled and SHOW
>>>> CATALOGS
>>>>>>> can be used for listing first without having a default catalog.
>>>>>>> 
>>>>>>> What do you think about option 2 and 3?
>>>>>>> 
>>>>>>> In any case, I would propose we pass a CatalogProvider to
>>>>>>> EnvironmentSettings and only allow a single instance. Catalogs
>>> should
>>>>>>> never shadow other catalogs.
>>>>>>> 
>>>>>>> We could also use the org.apache.flink.table.factories.Factory
>>> infra
>>>> and
>>>>>>> allow catalog providers via pure string properties. Not sure if we
>>>> need
>>>>>>> this in the first version though.
>>>>>>> 
>>>>>>> Cheers,
>>>>>>> Timo
>>>>>>> 
>>>>>>> 
>>>>>>> On 06.02.23 11:21, Feng Jin wrote:
>>>>>>>> Hi everyone,
>>>>>>>> 
>>>>>>>> The original discussion address is
>>>>>>>> https://issues.apache.org/jira/browse/FLINK-30126
>>>>>>>> 
>>>>>>>> Currently, Flink has access to many systems, including kafka,
>>> hive,
>>>>>>>> iceberg, hudi, elasticsearch, mysql...  The corresponding catalog
>>>> name
>>>>>>>> might be:
>>>>>>>> kafka_cluster1, kafka_cluster2, hive_cluster1, hive_cluster2,
>>>>>>>> iceberg_cluster2, elasticsearch_cluster1,  mysql_database1_xxx,
>>>>>>>> mysql_database2_xxxx
>>>>>>>> 
>>>>>>>> As the platform of the Flink SQL job, we need to maintain the
>>> meta
>>>>>>>> information of each system of the company, and when the Flink job
>>>>>>>> starts, we need to register the catalog with the Flink table
>>>>>>>> environment, so that users can use any table through the
>>>>>>>> env.executeSql interface.
>>>>>>>> 
>>>>>>>> When we only have a small number of catalogs, we can register
>>> like
>>>>>>>> this, but when there are thousands of catalogs, I think that
>>> there
>>>>>>>> needs to be a dynamic loading mechanism that we can register
>>>> catalog
>>>>>>>> when needed, speed up the initialization of the table
>>> environment,
>>>> and
>>>>>>>> avoid the useless catalog registration process.
>>>>>>>> 
>>>>>>>> Preliminary thoughts:
>>>>>>>> 
>>>>>>>> A new CatalogProvider interface can be added:
>>>>>>>> It contains two interfaces:
>>>>>>>> * listCatalogs() interface, which can list all the interfaces
>>> that
>>>> the
>>>>>>>> interface can provide
>>>>>>>> * getCatalog() interface,  which can get a catalog instance by
>>>> catalog
>>>>>>> name.
>>>>>>>> 
>>>>>>>> ```java
>>>>>>>> public interface CatalogProvider {
>>>>>>>> 
>>>>>>>>     default void initialize(ClassLoader classLoader,
>>>> ReadableConfig
>>>>>>> config) {}
>>>>>>>> 
>>>>>>>>     Optional<Catalog> getCatalog(String catalogName);
>>>>>>>> 
>>>>>>>>     Set<String> listCatalogs();
>>>>>>>> }
>>>>>>>> ```
>>>>>>>> 
>>>>>>>> 
>>>>>>>> The corresponding implementation in CatalogManager is as follows:
>>>>>>>> 
>>>>>>>> ```java
>>>>>>>> public CatalogManager {
>>>>>>>>     private @Nullable CatalogProvider catalogProvider;
>>>>>>>> 
>>>>>>>>     private Map<String, Catalog> catalogs;
>>>>>>>> 
>>>>>>>>     public void setCatalogProvider(CatalogProvider
>>>> catalogProvider) {
>>>>>>>>         this.catalogProvider = catalogProvider;
>>>>>>>>     }
>>>>>>>> 
>>>>>>>>     public Optional<Catalog> getCatalog(String catalogName) {
>>>>>>>>         // If there is no corresponding catalog in catalogs,
>>>>>>>>         // get catalog by catalogProvider
>>>>>>>>         if (catalogProvider != null) {
>>>>>>>>             Optional<Catalog> catalog =
>>>>>>> catalogProvider.getCatalog(catalogName);
>>>>>>>>         }
>>>>>>>>     }
>>>>>>>> 
>>>>>>>> }
>>>>>>>> ```
>>>>>>>> 
>>>>>>>> 
>>>>>>>> 
>>>>>>>> Possible problems:
>>>>>>>> 
>>>>>>>> 1. Catalog name conflict, how to choose when the registered
>>> catalog
>>>>>>>> and the catalog provided by catalog-provider conflict?
>>>>>>>> I prefer tableEnv-registered ones over catalogs provided by the
>>>>>>>> catalog-provider. If the user wishes to reference the catalog
>>>> provided
>>>>>>>> by the catalog-provider, they can unregister the catalog in
>>>> tableEnv
>>>>>>>> through the `unregisterCatalog` interface.
>>>>>>>> 
>>>>>>>> 2. Number of CatalogProviders, is it possible to have multiple
>>>>>>>> catalogProvider implementations?
>>>>>>>> I don't have a good idea of this at the moment. If multiple
>>>>>>>> catalogProviders are supported, it brings much more convenience,
>>>> But
>>>>>>>> there may be catalog name conflicts between different
>>>>>>>> catalogProviders.
>>>>>>>> 
>>>>>>>> 
>>>>>>>> 
>>>>>>>> Looking forward to your reply, any feedback is appreciated!
>>>>>>>> 
>>>>>>>> 
>>>>>>>> Best.
>>>>>>>> 
>>>>>>>> Feng Jin
>>>>>>>> 
>>>>>>> 
>>>>>>> 
>>>>>> 
>>>> 
>>> 

Reply via email to