Hi all,

I updated FLIP based on your feedback:

1. Introduce interfaces: GenericCatalog, ManagedTableFactory,
TableDescriptor.forManaged

2. Introduce log.scan.startup.mode (default initial) to Hybrid source.

3. Add description to miss dropped table.

Best,
Jingsong

On Mon, Oct 25, 2021 at 3:39 PM Jingsong Li <jingsongl...@gmail.com> wrote:
>
> Hi Ingo,
>
> Really appreciate your feedback.
>
> #1. The reason why we insist on using no "connector" option is that we
> want to bring the following design to users:
> - With the "connector" option, it is a mapping, unmanaged table.
> - Without the "connector" option, it is a managed table. It may be an
> Iceberg managed table, or may be a JDBC managed table, or may be a
> Flink managed table.
>
> #2. About:
> CREATE TABLE T (f0 INT);
> ALTER TABLE T SET ('connector' = '…');
>
> I think it is dangerous, even for a generic table. The managed table
> should prohibit it.
>
> #3. DDL and Table API
>
> You are right, Table Api should be a superset of SQL. There is no
> doubt that it should support BDT.
>
> Best,
> Jingsong
>
> On Mon, Oct 25, 2021 at 2:18 PM Ingo Bürk <i...@ververica.com> wrote:
> >
> > Hi Jingsong,
> >
> > thanks again for the answers. I think requiring catalogs to implement an
> > interface to support BDTs is something we'll need (though personally I
> > still prefer explicit DDL here over the "no connector option" approach).
> >
> > What about more edge cases like
> >
> > CREATE TABLE T (f0 INT);
> > ALTER TABLE T SET ('connector' = '…');
> >
> > This would have to first create the physical storage and then delete it
> > again, right?
> >
> > On a separate note, he FLIP currently only discusses SQL DDL, and you have
> > also mentioned
> >
> > > BDT only can be dropped by Flink SQL DDL now.
> >
> > Something Flink suffers from a lot is inconsistencies across APIs. I think
> > it is important that we support features on all major APIs, i.e. including
> > Table API.
> > For example for creating a BDT this would mean e.g. adding something like
> > #forManaged(…) to TableDescriptor.
> >
> >
> > Best
> > Ingo
> >
> > On Mon, Oct 25, 2021 at 5:27 AM Jingsong Li <jingsongl...@gmail.com> wrote:
> >
> > > Hi Ingo,
> > >
> > > I thought again.
> > >
> > > I'll try to sort out the current catalog behaviors.
> > > Actually, we can divide catalogs into three categories:
> > >
> > > 1. ExternalCatalog: it can only read or create a single table kind
> > > which connects to external storage. TableFactory is provided by
> > > Catalog, which can have nothing to do with Flink's Factory discovery
> > > mechanism, such as IcebergCatalog, JdbcCatalog, PostgresCatalog, etc.
> > > Catalog manages the life cycle of its **managed** tables, which means
> > > that creation and drop will affect the real physical storage. The DDL
> > > has no "connector" option.
> > >
> > > 2. GenericCatalog (or FlinkCatalog): only Flink tables are saved and
> > > factories are created through Flink's factory discovery mechanism. At
> > > this time, the catalog is actually only a storage medium for saving
> > > schema and options, such as GenericInMemoryCatalog. Catalog only saves
> > > meta information and does not manage the underlying physical storage
> > > of tables. These tables are **unmanaged**. The DDL must have a
> > > "connector" option.
> > >
> > > 3. HybridCatalog: It can save both its own **managed** table and
> > > generic Flink **unmanaged** table, such as HiveCatalog.
> > >
> > > We want to use the "connector" option to distinguish whether it is
> > > managed or not.
> > >
> > > Now, consider the Flink managed table in this FLIP.
> > > a. ExternalCatalog can not support Flink managed tables.
> > > b. GenericCatalog can support Flink managed tables without the
> > > "connector" option.
> > > c. What about HybridCatalog (HiveCatalog)? Yes, we want HiveCatalog to
> > > support Flink managed tables:
> > > - with "connector" option in Flink dialect is unmanaged tables
> > > - Hive DDL in Hive dialect is Hive managed tables, the parser will add
> > > "connector = hive" automatically. At present, there are many
> > > differences between Flink DDL and Hive DDL, and even their features
> > > have many differences.
> > > - without "connector" option in Flink dialect is Flink managed tables.
> > >
> > > In this way, we can support Flink managed tables while maintaining
> > > compatibility.
> > >
> > > Anyway, we need introduce a "SupportsFlinkManagedTable" to catalog.
> > >
> > > ############## Back to your question #################
> > >
> > > > but we should make it clear that this is a limitation and probably
> > > document how users can clean up the underlying physical storage manually 
> > > in
> > > this case
> > >
> > > Yes, it's strange that the catalog should manage tables, but some
> > > catalogs don't have this ability.
> > > - For PersistentCatalog, the meta will continue until the underlying
> > > physical storage is deleted.
> > > - For InMemoryCatalog, yes, we should document it for the underlying
> > > physical storage of Flink managed tables.
> > >
> > > > the HiveCatalog doesn't list a 'connector' option for its tables.
> > >
> > > Actually, It can be divided into two steps: create and save:
> > > - When creating a table, the table seen by HiveCatalog must have
> > > "connector = hive", which is the hive table (Hive managed table). You
> > > can see the "HiveCatalog.isHiveTable".
> > > - When saving the table, it will remove the connector of the hive
> > > table. We can do this: with "connector" option is Flink generic table,
> > > without "connector" option is Hive table, with "flink-managed = true"
> > > is Flink managed table.
> > >
> > > Best,
> > > Jingsong Lee
> > >
> > > On Thu, Oct 21, 2021 at 8:23 PM Ingo Bürk <i...@ververica.com> wrote:
> > > >
> > > > Hi JingSong,
> > > >
> > > > thank you for the answers!
> > > >
> > > > > BDT only can be dropped by Flink SQL DDL now.
> > > >
> > > > Maybe I'm misunderstanding, but that's only true from the Flink side.
> > > What
> > > > I meant is that a table could disappear from a catalog entirely outside
> > > of
> > > > Flink. As a simple example, consider a catalog which represents an IMAP
> > > > mail server and each folder as a table. If a folder is deleted from the
> > > > mail account, the table would disappear, but Flink would have no way of
> > > > knowing that. I don't see a way around this problem, to be honest, but 
> > > > we
> > > > should make it clear that this is a limitation and probably document how
> > > > users can clean up the underlying physical storage manually in this 
> > > > case?
> > > >
> > > > > - Option 1: Create table without the connector option, the table will
> > > > > be forcibly translated to BDT.
> > > >
> > > > This would be a breaking change, right? If I remember correctly (but I
> > > > might not :-)), even the HiveCatalog doesn't list a 'connector' option
> > > for
> > > > its tables.
> > > >
> > > > This approach is also very implicit, and creating physical storage isn't
> > > > exactly "free", so I personally would favor one of the other approaches.
> > > > Option (2) would be explicit for the end user, while Option (3) is again
> > > > implicit for the user and only explicit for the catalog implementor, so 
> > > > I
> > > > kind of favor Option (2) because I feel that users should be aware of
> > > > creating a Flink-managed table.
> > > >
> > > > We also need to consider the upgrade path here: if a catalog exposes
> > > tables
> > > > without 'connector' options today, we need to make sure that once this
> > > FLIP
> > > > is implemented no errors are thrown because codepaths assume that
> > > physical
> > > > storage must exist for such tables (since they were created before the
> > > > FLIP).
> > > >
> > > >
> > > > Best
> > > > Ingo
> > > >
> > > > On Thu, Oct 21, 2021 at 1:31 PM Jingsong Li <jingsongl...@gmail.com>
> > > wrote:
> > > >
> > > > > Hi Ingo and wenlong,
> > > > >
> > > > > Thanks for your feedback. Very good questions!
> > > > >
> > > > > (Built-in Dynamic Table is simplified as BDT)
> > > > >
> > > > > First, let's look at the following questions:
> > > > >
> > > > > 1. Does BDT want a separate catalog or can it be placed in all
> > > > > catalogs (such as InMemoryCatalog and HiveCatalog)?
> > > > >  - BDT wants the latter. Because in iceberg, we have seen that a
> > > > > separate catalog undoubtedly needs to recreate a set of catalogs. We
> > > > > often don't know whether it is Flink's HiveCatalog or iceberg's
> > > > > HiveCatalog. This brings not only duplication of work, but also
> > > > > confusion.
> > > > >  - How does catalog persist BDT? As a general Flink table, persist the
> > > > > schema and options of the table.
> > > > >
> > > > > 2. Is Flink's DDL mapping or real physical storage?
> > > > > - Mapping: creating and dropping tables only change the mapping
> > > > > relationship,
> > > > > - Physical storage: creating and dropping tables will actually delete
> > > > > the underlying storage
> > > > > - Status quo: the general connectors are all mapping, while the self
> > > > > managed tables of Catalog are real storage.
> > > > > - BDT wants real physical storage, because it can provide database
> > > > > level experience, and BDT wants to be orthogonal to catalog.
> > > > > Therefore, BDT is bound to break the current situation and become a
> > > > > new concept.
> > > > >
> > > > > Based on the above conclusion, let's look at your question.
> > > > >
> > > > > To Ingo:
> > > > >
> > > > > > if tables are dropped externally rather than through Flink SQL DDL,
> > > how
> > > > > would Flink be able to remove the physical storage for it.
> > > > >
> > > > > BDT only can be dropped by Flink SQL DDL now.
> > > > >
> > > > > To wenlong:
> > > > >
> > > > > > How the built-in table would be persisted in Catalog?
> > > > >
> > > > > Just like a general Flink table, persist the schema and options of the
> > > > > table.
> > > > >
> > > > > > Is it possible to read historical data from the file store first and
> > > > > then fetch new changes from the log store? something like a hybrid
> > > source,
> > > > > but I think we need a mechanism to get exactly-once semantic.
> > > > >
> > > > > This can be implemented, but we need to save the Kafka offset of the
> > > > > current checkpoint in the snapshot, so that we can accurately switch
> > > > > between file and log. But this is not in MVP.
> > > > >
> > > > > To Ingo and wenlong:
> > > > >
> > > > > > Currently a catalog can provide a default table factory and would be
> > > > > used as the top priority factory, what would happen after the default
> > > > > factory was introduced.
> > > > >
> > > > > - Option 1: Create table without the connector option, the table will
> > > > > be forcibly translated to BDT.
> > > > > - Option 2: Introduce new grammar, for example, "CREATE MANAGED
> > > > > TABLE...", this will separate from the default table of catalog.
> > > > > Catalog can define its own managed tables.
> > > > > - Option 3: Create table without the connector option, but introduce
> > > > > interface to Catalog, for example, "SupportsFlinkManagedTable". The
> > > > > catalog that can support BDT can implement
> > > > > it.(InMemoryCatalog,HiveCatalog). Catalogs that do not support BDT can
> > > > > implement their own managed tables.(IcebergCatalog, these catalogs do
> > > > > not even support other flink tables)
> > > > >
> > > > > Best,
> > > > > Jingsong
> > > > >
> > > > > On Thu, Oct 21, 2021 at 11:37 AM wenlong.lwl <wenlong88....@gmail.com>
> > > > > wrote:
> > > > > >
> > > > > > Hi Jingsong, thanks for the proposal, providing a built-in storage
> > > > > solution
> > > > > > for users will make flink SQL much more easier to use in production.
> > > > > >
> > > > > > I have some questions which may be missed in the FLIP, but may be
> > > > > important
> > > > > > IMO:
> > > > > > 1. Is it possible to read historical data from the file store first
> > > and
> > > > > > then fetch new changes from the log store? something like a hybrid
> > > > > source,
> > > > > > but I think we need a mechanism to get exactly-once semantic.
> > > > > > 2. How the built-in table would be persisted in Catalog?
> > > > > > 3. Currently a catalog can provide a default table factory and would
> > > be
> > > > > > used as the top priority factory, what would happen after the 
> > > > > > default
> > > > > > factory was introduced.
> > > > > >
> > > > > > On Wed, 20 Oct 2021 at 19:35, Ingo Bürk <i...@ververica.com> wrote:
> > > > > >
> > > > > > > Hi Jingsong,
> > > > > > >
> > > > > > > thank you for writing up the proposal. The benefits such a
> > > mechanism
> > > > > will
> > > > > > > bring will be very valuable! I haven't yet looked into this in
> > > detail,
> > > > > but
> > > > > > > one question came to my mind immediately:
> > > > > > >
> > > > > > > The DDL for these tables seems to rely on there not being a
> > > 'connector'
> > > > > > > option. However, catalogs can provide a custom factory, and thus
> > > tables
> > > > > > > don't necessarily need to contain such an option already today. 
> > > > > > > How
> > > > > will
> > > > > > > this interact / work with catalogs? I think there are more points
> > > > > regarding
> > > > > > > interaction with catalogs, e.g. if tables are dropped externally
> > > rather
> > > > > > > than through Flink SQL DDL, how would Flink be able to remove the
> > > > > physical
> > > > > > > storage for it.
> > > > > > >
> > > > > > >
> > > > > > > Best
> > > > > > > Ingo
> > > > > > >
> > > > > > > On Wed, Oct 20, 2021 at 11:14 AM Jingsong Li <
> > > jingsongl...@gmail.com>
> > > > > > > wrote:
> > > > > > >
> > > > > > > > Hi all,
> > > > > > > >
> > > > > > > > Kurt and I propose to introduce built-in storage support for
> > > dynamic
> > > > > > > > table, a truly unified changelog & table representation, from
> > > Flink
> > > > > > > > SQL’s perspective. We believe this kind of storage will improve
> > > the
> > > > > > > > usability a lot.
> > > > > > > >
> > > > > > > > We want to highlight some characteristics about this storage:
> > > > > > > >
> > > > > > > > - It’s a built-in storage for Flink SQL
> > > > > > > > ** Improve usability issues
> > > > > > > > ** Flink DDL is no longer just a mapping, but a real creation 
> > > > > > > > for
> > > > > these
> > > > > > > > tables
> > > > > > > > ** Masks & abstracts the underlying technical details, no
> > > annoying
> > > > > > > options
> > > > > > > >
> > > > > > > > - Supports subsecond streaming write & consumption
> > > > > > > > ** It could be backed by a service-oriented message queue (Like
> > > > > Kafka)
> > > > > > > > ** High throughput scan capability
> > > > > > > > ** Filesystem with columnar formats would be an ideal choice 
> > > > > > > > just
> > > > > like
> > > > > > > > iceberg/hudi does.
> > > > > > > >
> > > > > > > > - More importantly, in order to solve the cognitive bar, storage
> > > > > needs
> > > > > > > > to automatically address various Insert/Update/Delete inputs and
> > > > > table
> > > > > > > > definitions
> > > > > > > > ** Receive any type of changelog
> > > > > > > > ** Table can have primary key or no primary key
> > > > > > > >
> > > > > > > > Looking forward to your feedback.
> > > > > > > >
> > > > > > > > [1]
> > > > > > > >
> > > > > > >
> > > > >
> > > https://cwiki.apache.org/confluence/display/FLINK/FLIP-188%3A+Introduce+Built-in+Dynamic+Table+Storage
> > > > > > > >
> > > > > > > > Best,
> > > > > > > > Jingsong Lee
> > > > > > > >
> > > > > > >
> > > > >
> > > > >
> > > > >
> > > > > --
> > > > > Best, Jingsong Lee
> > > > >
> > >
> > >
> > >
> > > --
> > > Best, Jingsong Lee
> > >
>
>
>
> --
> Best, Jingsong Lee



-- 
Best, Jingsong Lee

Reply via email to