Hi Ingo and wenlong,

Thanks for your feedback. Very good questions!

(Built-in Dynamic Table is simplified as BDT)

First, let's look at the following questions:

1. Does BDT want a separate catalog or can it be placed in all
catalogs (such as InMemoryCatalog and HiveCatalog)?
 - BDT wants the latter. Because in iceberg, we have seen that a
separate catalog undoubtedly needs to recreate a set of catalogs. We
often don't know whether it is Flink's HiveCatalog or iceberg's
HiveCatalog. This brings not only duplication of work, but also
confusion.
 - How does catalog persist BDT? As a general Flink table, persist the
schema and options of the table.

2. Is Flink's DDL mapping or real physical storage?
- Mapping: creating and dropping tables only change the mapping relationship,
- Physical storage: creating and dropping tables will actually delete
the underlying storage
- Status quo: the general connectors are all mapping, while the self
managed tables of Catalog are real storage.
- BDT wants real physical storage, because it can provide database
level experience, and BDT wants to be orthogonal to catalog.
Therefore, BDT is bound to break the current situation and become a
new concept.

Based on the above conclusion, let's look at your question.

To Ingo:

> if tables are dropped externally rather than through Flink SQL DDL, how would 
> Flink be able to remove the physical storage for it.

BDT only can be dropped by Flink SQL DDL now.

To wenlong:

> How the built-in table would be persisted in Catalog?

Just like a general Flink table, persist the schema and options of the table.

> Is it possible to read historical data from the file store first and then 
> fetch new changes from the log store? something like a hybrid source, but I 
> think we need a mechanism to get exactly-once semantic.

This can be implemented, but we need to save the Kafka offset of the
current checkpoint in the snapshot, so that we can accurately switch
between file and log. But this is not in MVP.

To Ingo and wenlong:

> Currently a catalog can provide a default table factory and would be used as 
> the top priority factory, what would happen after the default factory was 
> introduced.

- Option 1: Create table without the connector option, the table will
be forcibly translated to BDT.
- Option 2: Introduce new grammar, for example, "CREATE MANAGED
TABLE...", this will separate from the default table of catalog.
Catalog can define its own managed tables.
- Option 3: Create table without the connector option, but introduce
interface to Catalog, for example, "SupportsFlinkManagedTable". The
catalog that can support BDT can implement
it.(InMemoryCatalog,HiveCatalog). Catalogs that do not support BDT can
implement their own managed tables.(IcebergCatalog, these catalogs do
not even support other flink tables)

Best,
Jingsong

On Thu, Oct 21, 2021 at 11:37 AM wenlong.lwl <wenlong88....@gmail.com> wrote:
>
> Hi Jingsong, thanks for the proposal, providing a built-in storage solution
> for users will make flink SQL much more easier to use in production.
>
> I have some questions which may be missed in the FLIP, but may be important
> IMO:
> 1. Is it possible to read historical data from the file store first and
> then fetch new changes from the log store? something like a hybrid source,
> but I think we need a mechanism to get exactly-once semantic.
> 2. How the built-in table would be persisted in Catalog?
> 3. Currently a catalog can provide a default table factory and would be
> used as the top priority factory, what would happen after the default
> factory was introduced.
>
> On Wed, 20 Oct 2021 at 19:35, Ingo Bürk <i...@ververica.com> wrote:
>
> > Hi Jingsong,
> >
> > thank you for writing up the proposal. The benefits such a mechanism will
> > bring will be very valuable! I haven't yet looked into this in detail, but
> > one question came to my mind immediately:
> >
> > The DDL for these tables seems to rely on there not being a 'connector'
> > option. However, catalogs can provide a custom factory, and thus tables
> > don't necessarily need to contain such an option already today. How will
> > this interact / work with catalogs? I think there are more points regarding
> > interaction with catalogs, e.g. if tables are dropped externally rather
> > than through Flink SQL DDL, how would Flink be able to remove the physical
> > storage for it.
> >
> >
> > Best
> > Ingo
> >
> > On Wed, Oct 20, 2021 at 11:14 AM Jingsong Li <jingsongl...@gmail.com>
> > wrote:
> >
> > > Hi all,
> > >
> > > Kurt and I propose to introduce built-in storage support for dynamic
> > > table, a truly unified changelog & table representation, from Flink
> > > SQL’s perspective. We believe this kind of storage will improve the
> > > usability a lot.
> > >
> > > We want to highlight some characteristics about this storage:
> > >
> > > - It’s a built-in storage for Flink SQL
> > > ** Improve usability issues
> > > ** Flink DDL is no longer just a mapping, but a real creation for these
> > > tables
> > > ** Masks & abstracts the underlying technical details, no annoying
> > options
> > >
> > > - Supports subsecond streaming write & consumption
> > > ** It could be backed by a service-oriented message queue (Like Kafka)
> > > ** High throughput scan capability
> > > ** Filesystem with columnar formats would be an ideal choice just like
> > > iceberg/hudi does.
> > >
> > > - More importantly, in order to solve the cognitive bar, storage needs
> > > to automatically address various Insert/Update/Delete inputs and table
> > > definitions
> > > ** Receive any type of changelog
> > > ** Table can have primary key or no primary key
> > >
> > > Looking forward to your feedback.
> > >
> > > [1]
> > >
> > https://cwiki.apache.org/confluence/display/FLINK/FLIP-188%3A+Introduce+Built-in+Dynamic+Table+Storage
> > >
> > > Best,
> > > Jingsong Lee
> > >
> >



-- 
Best, Jingsong Lee

Reply via email to