Hi Jingsong, Thanks for creating this FLIP. I don't have a lot to add because I am not very familiar with the SQL components. While reading the FLIP I was wondering what would we need in Flink to build something like the BDT feature outside of Flink as a kind of extension? Would something like this be possible? Maybe the answer is a quick no ;-)
Cheers, Till On Thu, Oct 28, 2021 at 8:06 AM Jingsong Li <jingsongl...@gmail.com> wrote: > Hi all, > > I updated FLIP based on your feedback: > > 1. Introduce interfaces: GenericCatalog, ManagedTableFactory, > TableDescriptor.forManaged > > 2. Introduce log.scan.startup.mode (default initial) to Hybrid source. > > 3. Add description to miss dropped table. > > Best, > Jingsong > > On Mon, Oct 25, 2021 at 3:39 PM Jingsong Li <jingsongl...@gmail.com> > wrote: > > > > Hi Ingo, > > > > Really appreciate your feedback. > > > > #1. The reason why we insist on using no "connector" option is that we > > want to bring the following design to users: > > - With the "connector" option, it is a mapping, unmanaged table. > > - Without the "connector" option, it is a managed table. It may be an > > Iceberg managed table, or may be a JDBC managed table, or may be a > > Flink managed table. > > > > #2. About: > > CREATE TABLE T (f0 INT); > > ALTER TABLE T SET ('connector' = '…'); > > > > I think it is dangerous, even for a generic table. The managed table > > should prohibit it. > > > > #3. DDL and Table API > > > > You are right, Table Api should be a superset of SQL. There is no > > doubt that it should support BDT. > > > > Best, > > Jingsong > > > > On Mon, Oct 25, 2021 at 2:18 PM Ingo Bürk <i...@ververica.com> wrote: > > > > > > Hi Jingsong, > > > > > > thanks again for the answers. I think requiring catalogs to implement > an > > > interface to support BDTs is something we'll need (though personally I > > > still prefer explicit DDL here over the "no connector option" > approach). > > > > > > What about more edge cases like > > > > > > CREATE TABLE T (f0 INT); > > > ALTER TABLE T SET ('connector' = '…'); > > > > > > This would have to first create the physical storage and then delete it > > > again, right? > > > > > > On a separate note, he FLIP currently only discusses SQL DDL, and you > have > > > also mentioned > > > > > > > BDT only can be dropped by Flink SQL DDL now. > > > > > > Something Flink suffers from a lot is inconsistencies across APIs. I > think > > > it is important that we support features on all major APIs, i.e. > including > > > Table API. > > > For example for creating a BDT this would mean e.g. adding something > like > > > #forManaged(…) to TableDescriptor. > > > > > > > > > Best > > > Ingo > > > > > > On Mon, Oct 25, 2021 at 5:27 AM Jingsong Li <jingsongl...@gmail.com> > wrote: > > > > > > > Hi Ingo, > > > > > > > > I thought again. > > > > > > > > I'll try to sort out the current catalog behaviors. > > > > Actually, we can divide catalogs into three categories: > > > > > > > > 1. ExternalCatalog: it can only read or create a single table kind > > > > which connects to external storage. TableFactory is provided by > > > > Catalog, which can have nothing to do with Flink's Factory discovery > > > > mechanism, such as IcebergCatalog, JdbcCatalog, PostgresCatalog, etc. > > > > Catalog manages the life cycle of its **managed** tables, which means > > > > that creation and drop will affect the real physical storage. The DDL > > > > has no "connector" option. > > > > > > > > 2. GenericCatalog (or FlinkCatalog): only Flink tables are saved and > > > > factories are created through Flink's factory discovery mechanism. At > > > > this time, the catalog is actually only a storage medium for saving > > > > schema and options, such as GenericInMemoryCatalog. Catalog only > saves > > > > meta information and does not manage the underlying physical storage > > > > of tables. These tables are **unmanaged**. The DDL must have a > > > > "connector" option. > > > > > > > > 3. HybridCatalog: It can save both its own **managed** table and > > > > generic Flink **unmanaged** table, such as HiveCatalog. > > > > > > > > We want to use the "connector" option to distinguish whether it is > > > > managed or not. > > > > > > > > Now, consider the Flink managed table in this FLIP. > > > > a. ExternalCatalog can not support Flink managed tables. > > > > b. GenericCatalog can support Flink managed tables without the > > > > "connector" option. > > > > c. What about HybridCatalog (HiveCatalog)? Yes, we want HiveCatalog > to > > > > support Flink managed tables: > > > > - with "connector" option in Flink dialect is unmanaged tables > > > > - Hive DDL in Hive dialect is Hive managed tables, the parser will > add > > > > "connector = hive" automatically. At present, there are many > > > > differences between Flink DDL and Hive DDL, and even their features > > > > have many differences. > > > > - without "connector" option in Flink dialect is Flink managed > tables. > > > > > > > > In this way, we can support Flink managed tables while maintaining > > > > compatibility. > > > > > > > > Anyway, we need introduce a "SupportsFlinkManagedTable" to catalog. > > > > > > > > ############## Back to your question ################# > > > > > > > > > but we should make it clear that this is a limitation and probably > > > > document how users can clean up the underlying physical storage > manually in > > > > this case > > > > > > > > Yes, it's strange that the catalog should manage tables, but some > > > > catalogs don't have this ability. > > > > - For PersistentCatalog, the meta will continue until the underlying > > > > physical storage is deleted. > > > > - For InMemoryCatalog, yes, we should document it for the underlying > > > > physical storage of Flink managed tables. > > > > > > > > > the HiveCatalog doesn't list a 'connector' option for its tables. > > > > > > > > Actually, It can be divided into two steps: create and save: > > > > - When creating a table, the table seen by HiveCatalog must have > > > > "connector = hive", which is the hive table (Hive managed table). You > > > > can see the "HiveCatalog.isHiveTable". > > > > - When saving the table, it will remove the connector of the hive > > > > table. We can do this: with "connector" option is Flink generic > table, > > > > without "connector" option is Hive table, with "flink-managed = true" > > > > is Flink managed table. > > > > > > > > Best, > > > > Jingsong Lee > > > > > > > > On Thu, Oct 21, 2021 at 8:23 PM Ingo Bürk <i...@ververica.com> > wrote: > > > > > > > > > > Hi JingSong, > > > > > > > > > > thank you for the answers! > > > > > > > > > > > BDT only can be dropped by Flink SQL DDL now. > > > > > > > > > > Maybe I'm misunderstanding, but that's only true from the Flink > side. > > > > What > > > > > I meant is that a table could disappear from a catalog entirely > outside > > > > of > > > > > Flink. As a simple example, consider a catalog which represents an > IMAP > > > > > mail server and each folder as a table. If a folder is deleted > from the > > > > > mail account, the table would disappear, but Flink would have no > way of > > > > > knowing that. I don't see a way around this problem, to be honest, > but we > > > > > should make it clear that this is a limitation and probably > document how > > > > > users can clean up the underlying physical storage manually in > this case? > > > > > > > > > > > - Option 1: Create table without the connector option, the table > will > > > > > > be forcibly translated to BDT. > > > > > > > > > > This would be a breaking change, right? If I remember correctly > (but I > > > > > might not :-)), even the HiveCatalog doesn't list a 'connector' > option > > > > for > > > > > its tables. > > > > > > > > > > This approach is also very implicit, and creating physical storage > isn't > > > > > exactly "free", so I personally would favor one of the other > approaches. > > > > > Option (2) would be explicit for the end user, while Option (3) is > again > > > > > implicit for the user and only explicit for the catalog > implementor, so I > > > > > kind of favor Option (2) because I feel that users should be aware > of > > > > > creating a Flink-managed table. > > > > > > > > > > We also need to consider the upgrade path here: if a catalog > exposes > > > > tables > > > > > without 'connector' options today, we need to make sure that once > this > > > > FLIP > > > > > is implemented no errors are thrown because codepaths assume that > > > > physical > > > > > storage must exist for such tables (since they were created before > the > > > > > FLIP). > > > > > > > > > > > > > > > Best > > > > > Ingo > > > > > > > > > > On Thu, Oct 21, 2021 at 1:31 PM Jingsong Li < > jingsongl...@gmail.com> > > > > wrote: > > > > > > > > > > > Hi Ingo and wenlong, > > > > > > > > > > > > Thanks for your feedback. Very good questions! > > > > > > > > > > > > (Built-in Dynamic Table is simplified as BDT) > > > > > > > > > > > > First, let's look at the following questions: > > > > > > > > > > > > 1. Does BDT want a separate catalog or can it be placed in all > > > > > > catalogs (such as InMemoryCatalog and HiveCatalog)? > > > > > > - BDT wants the latter. Because in iceberg, we have seen that a > > > > > > separate catalog undoubtedly needs to recreate a set of > catalogs. We > > > > > > often don't know whether it is Flink's HiveCatalog or iceberg's > > > > > > HiveCatalog. This brings not only duplication of work, but also > > > > > > confusion. > > > > > > - How does catalog persist BDT? As a general Flink table, > persist the > > > > > > schema and options of the table. > > > > > > > > > > > > 2. Is Flink's DDL mapping or real physical storage? > > > > > > - Mapping: creating and dropping tables only change the mapping > > > > > > relationship, > > > > > > - Physical storage: creating and dropping tables will actually > delete > > > > > > the underlying storage > > > > > > - Status quo: the general connectors are all mapping, while the > self > > > > > > managed tables of Catalog are real storage. > > > > > > - BDT wants real physical storage, because it can provide > database > > > > > > level experience, and BDT wants to be orthogonal to catalog. > > > > > > Therefore, BDT is bound to break the current situation and > become a > > > > > > new concept. > > > > > > > > > > > > Based on the above conclusion, let's look at your question. > > > > > > > > > > > > To Ingo: > > > > > > > > > > > > > if tables are dropped externally rather than through Flink SQL > DDL, > > > > how > > > > > > would Flink be able to remove the physical storage for it. > > > > > > > > > > > > BDT only can be dropped by Flink SQL DDL now. > > > > > > > > > > > > To wenlong: > > > > > > > > > > > > > How the built-in table would be persisted in Catalog? > > > > > > > > > > > > Just like a general Flink table, persist the schema and options > of the > > > > > > table. > > > > > > > > > > > > > Is it possible to read historical data from the file store > first and > > > > > > then fetch new changes from the log store? something like a > hybrid > > > > source, > > > > > > but I think we need a mechanism to get exactly-once semantic. > > > > > > > > > > > > This can be implemented, but we need to save the Kafka offset of > the > > > > > > current checkpoint in the snapshot, so that we can accurately > switch > > > > > > between file and log. But this is not in MVP. > > > > > > > > > > > > To Ingo and wenlong: > > > > > > > > > > > > > Currently a catalog can provide a default table factory and > would be > > > > > > used as the top priority factory, what would happen after the > default > > > > > > factory was introduced. > > > > > > > > > > > > - Option 1: Create table without the connector option, the table > will > > > > > > be forcibly translated to BDT. > > > > > > - Option 2: Introduce new grammar, for example, "CREATE MANAGED > > > > > > TABLE...", this will separate from the default table of catalog. > > > > > > Catalog can define its own managed tables. > > > > > > - Option 3: Create table without the connector option, but > introduce > > > > > > interface to Catalog, for example, "SupportsFlinkManagedTable". > The > > > > > > catalog that can support BDT can implement > > > > > > it.(InMemoryCatalog,HiveCatalog). Catalogs that do not support > BDT can > > > > > > implement their own managed tables.(IcebergCatalog, these > catalogs do > > > > > > not even support other flink tables) > > > > > > > > > > > > Best, > > > > > > Jingsong > > > > > > > > > > > > On Thu, Oct 21, 2021 at 11:37 AM wenlong.lwl < > wenlong88....@gmail.com> > > > > > > wrote: > > > > > > > > > > > > > > Hi Jingsong, thanks for the proposal, providing a built-in > storage > > > > > > solution > > > > > > > for users will make flink SQL much more easier to use in > production. > > > > > > > > > > > > > > I have some questions which may be missed in the FLIP, but may > be > > > > > > important > > > > > > > IMO: > > > > > > > 1. Is it possible to read historical data from the file store > first > > > > and > > > > > > > then fetch new changes from the log store? something like a > hybrid > > > > > > source, > > > > > > > but I think we need a mechanism to get exactly-once semantic. > > > > > > > 2. How the built-in table would be persisted in Catalog? > > > > > > > 3. Currently a catalog can provide a default table factory and > would > > > > be > > > > > > > used as the top priority factory, what would happen after the > default > > > > > > > factory was introduced. > > > > > > > > > > > > > > On Wed, 20 Oct 2021 at 19:35, Ingo Bürk <i...@ververica.com> > wrote: > > > > > > > > > > > > > > > Hi Jingsong, > > > > > > > > > > > > > > > > thank you for writing up the proposal. The benefits such a > > > > mechanism > > > > > > will > > > > > > > > bring will be very valuable! I haven't yet looked into this > in > > > > detail, > > > > > > but > > > > > > > > one question came to my mind immediately: > > > > > > > > > > > > > > > > The DDL for these tables seems to rely on there not being a > > > > 'connector' > > > > > > > > option. However, catalogs can provide a custom factory, and > thus > > > > tables > > > > > > > > don't necessarily need to contain such an option already > today. How > > > > > > will > > > > > > > > this interact / work with catalogs? I think there are more > points > > > > > > regarding > > > > > > > > interaction with catalogs, e.g. if tables are dropped > externally > > > > rather > > > > > > > > than through Flink SQL DDL, how would Flink be able to > remove the > > > > > > physical > > > > > > > > storage for it. > > > > > > > > > > > > > > > > > > > > > > > > Best > > > > > > > > Ingo > > > > > > > > > > > > > > > > On Wed, Oct 20, 2021 at 11:14 AM Jingsong Li < > > > > jingsongl...@gmail.com> > > > > > > > > wrote: > > > > > > > > > > > > > > > > > Hi all, > > > > > > > > > > > > > > > > > > Kurt and I propose to introduce built-in storage support > for > > > > dynamic > > > > > > > > > table, a truly unified changelog & table representation, > from > > > > Flink > > > > > > > > > SQL’s perspective. We believe this kind of storage will > improve > > > > the > > > > > > > > > usability a lot. > > > > > > > > > > > > > > > > > > We want to highlight some characteristics about this > storage: > > > > > > > > > > > > > > > > > > - It’s a built-in storage for Flink SQL > > > > > > > > > ** Improve usability issues > > > > > > > > > ** Flink DDL is no longer just a mapping, but a real > creation for > > > > > > these > > > > > > > > > tables > > > > > > > > > ** Masks & abstracts the underlying technical details, no > > > > annoying > > > > > > > > options > > > > > > > > > > > > > > > > > > - Supports subsecond streaming write & consumption > > > > > > > > > ** It could be backed by a service-oriented message queue > (Like > > > > > > Kafka) > > > > > > > > > ** High throughput scan capability > > > > > > > > > ** Filesystem with columnar formats would be an ideal > choice just > > > > > > like > > > > > > > > > iceberg/hudi does. > > > > > > > > > > > > > > > > > > - More importantly, in order to solve the cognitive bar, > storage > > > > > > needs > > > > > > > > > to automatically address various Insert/Update/Delete > inputs and > > > > > > table > > > > > > > > > definitions > > > > > > > > > ** Receive any type of changelog > > > > > > > > > ** Table can have primary key or no primary key > > > > > > > > > > > > > > > > > > Looking forward to your feedback. > > > > > > > > > > > > > > > > > > [1] > > > > > > > > > > > > > > > > > > > > > > > > > > > > https://cwiki.apache.org/confluence/display/FLINK/FLIP-188%3A+Introduce+Built-in+Dynamic+Table+Storage > > > > > > > > > > > > > > > > > > Best, > > > > > > > > > Jingsong Lee > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > -- > > > > > > Best, Jingsong Lee > > > > > > > > > > > > > > > > > > > > > > -- > > > > Best, Jingsong Lee > > > > > > > > > > > > -- > > Best, Jingsong Lee > > > > -- > Best, Jingsong Lee >