Hi JingSong,

thank you for the answers!

> BDT only can be dropped by Flink SQL DDL now.

Maybe I'm misunderstanding, but that's only true from the Flink side. What
I meant is that a table could disappear from a catalog entirely outside of
Flink. As a simple example, consider a catalog which represents an IMAP
mail server and each folder as a table. If a folder is deleted from the
mail account, the table would disappear, but Flink would have no way of
knowing that. I don't see a way around this problem, to be honest, but we
should make it clear that this is a limitation and probably document how
users can clean up the underlying physical storage manually in this case?

> - Option 1: Create table without the connector option, the table will
> be forcibly translated to BDT.

This would be a breaking change, right? If I remember correctly (but I
might not :-)), even the HiveCatalog doesn't list a 'connector' option for
its tables.

This approach is also very implicit, and creating physical storage isn't
exactly "free", so I personally would favor one of the other approaches.
Option (2) would be explicit for the end user, while Option (3) is again
implicit for the user and only explicit for the catalog implementor, so I
kind of favor Option (2) because I feel that users should be aware of
creating a Flink-managed table.

We also need to consider the upgrade path here: if a catalog exposes tables
without 'connector' options today, we need to make sure that once this FLIP
is implemented no errors are thrown because codepaths assume that physical
storage must exist for such tables (since they were created before the
FLIP).


Best
Ingo

On Thu, Oct 21, 2021 at 1:31 PM Jingsong Li <jingsongl...@gmail.com> wrote:

> Hi Ingo and wenlong,
>
> Thanks for your feedback. Very good questions!
>
> (Built-in Dynamic Table is simplified as BDT)
>
> First, let's look at the following questions:
>
> 1. Does BDT want a separate catalog or can it be placed in all
> catalogs (such as InMemoryCatalog and HiveCatalog)?
>  - BDT wants the latter. Because in iceberg, we have seen that a
> separate catalog undoubtedly needs to recreate a set of catalogs. We
> often don't know whether it is Flink's HiveCatalog or iceberg's
> HiveCatalog. This brings not only duplication of work, but also
> confusion.
>  - How does catalog persist BDT? As a general Flink table, persist the
> schema and options of the table.
>
> 2. Is Flink's DDL mapping or real physical storage?
> - Mapping: creating and dropping tables only change the mapping
> relationship,
> - Physical storage: creating and dropping tables will actually delete
> the underlying storage
> - Status quo: the general connectors are all mapping, while the self
> managed tables of Catalog are real storage.
> - BDT wants real physical storage, because it can provide database
> level experience, and BDT wants to be orthogonal to catalog.
> Therefore, BDT is bound to break the current situation and become a
> new concept.
>
> Based on the above conclusion, let's look at your question.
>
> To Ingo:
>
> > if tables are dropped externally rather than through Flink SQL DDL, how
> would Flink be able to remove the physical storage for it.
>
> BDT only can be dropped by Flink SQL DDL now.
>
> To wenlong:
>
> > How the built-in table would be persisted in Catalog?
>
> Just like a general Flink table, persist the schema and options of the
> table.
>
> > Is it possible to read historical data from the file store first and
> then fetch new changes from the log store? something like a hybrid source,
> but I think we need a mechanism to get exactly-once semantic.
>
> This can be implemented, but we need to save the Kafka offset of the
> current checkpoint in the snapshot, so that we can accurately switch
> between file and log. But this is not in MVP.
>
> To Ingo and wenlong:
>
> > Currently a catalog can provide a default table factory and would be
> used as the top priority factory, what would happen after the default
> factory was introduced.
>
> - Option 1: Create table without the connector option, the table will
> be forcibly translated to BDT.
> - Option 2: Introduce new grammar, for example, "CREATE MANAGED
> TABLE...", this will separate from the default table of catalog.
> Catalog can define its own managed tables.
> - Option 3: Create table without the connector option, but introduce
> interface to Catalog, for example, "SupportsFlinkManagedTable". The
> catalog that can support BDT can implement
> it.(InMemoryCatalog,HiveCatalog). Catalogs that do not support BDT can
> implement their own managed tables.(IcebergCatalog, these catalogs do
> not even support other flink tables)
>
> Best,
> Jingsong
>
> On Thu, Oct 21, 2021 at 11:37 AM wenlong.lwl <wenlong88....@gmail.com>
> wrote:
> >
> > Hi Jingsong, thanks for the proposal, providing a built-in storage
> solution
> > for users will make flink SQL much more easier to use in production.
> >
> > I have some questions which may be missed in the FLIP, but may be
> important
> > IMO:
> > 1. Is it possible to read historical data from the file store first and
> > then fetch new changes from the log store? something like a hybrid
> source,
> > but I think we need a mechanism to get exactly-once semantic.
> > 2. How the built-in table would be persisted in Catalog?
> > 3. Currently a catalog can provide a default table factory and would be
> > used as the top priority factory, what would happen after the default
> > factory was introduced.
> >
> > On Wed, 20 Oct 2021 at 19:35, Ingo Bürk <i...@ververica.com> wrote:
> >
> > > Hi Jingsong,
> > >
> > > thank you for writing up the proposal. The benefits such a mechanism
> will
> > > bring will be very valuable! I haven't yet looked into this in detail,
> but
> > > one question came to my mind immediately:
> > >
> > > The DDL for these tables seems to rely on there not being a 'connector'
> > > option. However, catalogs can provide a custom factory, and thus tables
> > > don't necessarily need to contain such an option already today. How
> will
> > > this interact / work with catalogs? I think there are more points
> regarding
> > > interaction with catalogs, e.g. if tables are dropped externally rather
> > > than through Flink SQL DDL, how would Flink be able to remove the
> physical
> > > storage for it.
> > >
> > >
> > > Best
> > > Ingo
> > >
> > > On Wed, Oct 20, 2021 at 11:14 AM Jingsong Li <jingsongl...@gmail.com>
> > > wrote:
> > >
> > > > Hi all,
> > > >
> > > > Kurt and I propose to introduce built-in storage support for dynamic
> > > > table, a truly unified changelog & table representation, from Flink
> > > > SQL’s perspective. We believe this kind of storage will improve the
> > > > usability a lot.
> > > >
> > > > We want to highlight some characteristics about this storage:
> > > >
> > > > - It’s a built-in storage for Flink SQL
> > > > ** Improve usability issues
> > > > ** Flink DDL is no longer just a mapping, but a real creation for
> these
> > > > tables
> > > > ** Masks & abstracts the underlying technical details, no annoying
> > > options
> > > >
> > > > - Supports subsecond streaming write & consumption
> > > > ** It could be backed by a service-oriented message queue (Like
> Kafka)
> > > > ** High throughput scan capability
> > > > ** Filesystem with columnar formats would be an ideal choice just
> like
> > > > iceberg/hudi does.
> > > >
> > > > - More importantly, in order to solve the cognitive bar, storage
> needs
> > > > to automatically address various Insert/Update/Delete inputs and
> table
> > > > definitions
> > > > ** Receive any type of changelog
> > > > ** Table can have primary key or no primary key
> > > >
> > > > Looking forward to your feedback.
> > > >
> > > > [1]
> > > >
> > >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-188%3A+Introduce+Built-in+Dynamic+Table+Storage
> > > >
> > > > Best,
> > > > Jingsong Lee
> > > >
> > >
>
>
>
> --
> Best, Jingsong Lee
>

Reply via email to