Hi Till,

We have discussed the possibility of putting this FLIP into another
repository offline
with Stephan and Timo. This looks similar with another under going effort
which trying
to put all connectors outside the Flink core repository.

>From the motivation and scope of this FLIP, it's quite different from
current connectors in
some aspects. What we are trying to offer is some kind of built-in storage,
or we can call it
internal/managed tables, compared with current connectors, they kind of
express external
tables of Flink SQL. Functionality-wise, this managed table would have more
ability than
all these connectors, since we controlled the implementation of such
storage. Thus this table
storage will interact with lots SQL components, like metadata handling,
optimization, execution,
etc.

However we do see some potential benefits if we choose to put it outside
Flink:
- We may achieve more rapid development speed and maybe more frequent
release.
- Force us to think really clearly about the interfaces it should be,
because we don't have
the shortcut to modify core & connector codes all at the same time.

But we also can't ignore the overhead:
- We almost need everything that is discussed in the splitting connectors
thread.
- We have to create lots more interface than TableSource/TableSink to make
it just work at the first
place, e.g. interfaces to express such tables should be managed by Flink,
interfaces to express the
physical capability of the storage then it can be bridged to SQL optimizer
and executor.
- If we create lots of interfaces with only one implementation, that sounds
overengineering to me.

Combining the pros and cons above, what we are trying to do is firstly
implement it in a feature branch,
and also keep good engineering and design in mind. At some point we
re-evaluate the decision whether
to put it inside or outside the Flink core. What do you think?

Best,
Kurt


On Fri, Oct 29, 2021 at 11:53 PM Till Rohrmann <trohrm...@apache.org> wrote:

> Hi Jingsong,
>
> Thanks for creating this FLIP. I don't have a lot to add because I am not
> very familiar with the SQL components. While reading the FLIP I was
> wondering what would we need in Flink to build something like the BDT
> feature outside of Flink as a kind of extension? Would something like this
> be possible? Maybe the answer is a quick no ;-)
>
> Cheers,
> Till
>
> On Thu, Oct 28, 2021 at 8:06 AM Jingsong Li <jingsongl...@gmail.com>
> wrote:
>
> > Hi all,
> >
> > I updated FLIP based on your feedback:
> >
> > 1. Introduce interfaces: GenericCatalog, ManagedTableFactory,
> > TableDescriptor.forManaged
> >
> > 2. Introduce log.scan.startup.mode (default initial) to Hybrid source.
> >
> > 3. Add description to miss dropped table.
> >
> > Best,
> > Jingsong
> >
> > On Mon, Oct 25, 2021 at 3:39 PM Jingsong Li <jingsongl...@gmail.com>
> > wrote:
> > >
> > > Hi Ingo,
> > >
> > > Really appreciate your feedback.
> > >
> > > #1. The reason why we insist on using no "connector" option is that we
> > > want to bring the following design to users:
> > > - With the "connector" option, it is a mapping, unmanaged table.
> > > - Without the "connector" option, it is a managed table. It may be an
> > > Iceberg managed table, or may be a JDBC managed table, or may be a
> > > Flink managed table.
> > >
> > > #2. About:
> > > CREATE TABLE T (f0 INT);
> > > ALTER TABLE T SET ('connector' = '…');
> > >
> > > I think it is dangerous, even for a generic table. The managed table
> > > should prohibit it.
> > >
> > > #3. DDL and Table API
> > >
> > > You are right, Table Api should be a superset of SQL. There is no
> > > doubt that it should support BDT.
> > >
> > > Best,
> > > Jingsong
> > >
> > > On Mon, Oct 25, 2021 at 2:18 PM Ingo Bürk <i...@ververica.com> wrote:
> > > >
> > > > Hi Jingsong,
> > > >
> > > > thanks again for the answers. I think requiring catalogs to implement
> > an
> > > > interface to support BDTs is something we'll need (though personally
> I
> > > > still prefer explicit DDL here over the "no connector option"
> > approach).
> > > >
> > > > What about more edge cases like
> > > >
> > > > CREATE TABLE T (f0 INT);
> > > > ALTER TABLE T SET ('connector' = '…');
> > > >
> > > > This would have to first create the physical storage and then delete
> it
> > > > again, right?
> > > >
> > > > On a separate note, he FLIP currently only discusses SQL DDL, and you
> > have
> > > > also mentioned
> > > >
> > > > > BDT only can be dropped by Flink SQL DDL now.
> > > >
> > > > Something Flink suffers from a lot is inconsistencies across APIs. I
> > think
> > > > it is important that we support features on all major APIs, i.e.
> > including
> > > > Table API.
> > > > For example for creating a BDT this would mean e.g. adding something
> > like
> > > > #forManaged(…) to TableDescriptor.
> > > >
> > > >
> > > > Best
> > > > Ingo
> > > >
> > > > On Mon, Oct 25, 2021 at 5:27 AM Jingsong Li <jingsongl...@gmail.com>
> > wrote:
> > > >
> > > > > Hi Ingo,
> > > > >
> > > > > I thought again.
> > > > >
> > > > > I'll try to sort out the current catalog behaviors.
> > > > > Actually, we can divide catalogs into three categories:
> > > > >
> > > > > 1. ExternalCatalog: it can only read or create a single table kind
> > > > > which connects to external storage. TableFactory is provided by
> > > > > Catalog, which can have nothing to do with Flink's Factory
> discovery
> > > > > mechanism, such as IcebergCatalog, JdbcCatalog, PostgresCatalog,
> etc.
> > > > > Catalog manages the life cycle of its **managed** tables, which
> means
> > > > > that creation and drop will affect the real physical storage. The
> DDL
> > > > > has no "connector" option.
> > > > >
> > > > > 2. GenericCatalog (or FlinkCatalog): only Flink tables are saved
> and
> > > > > factories are created through Flink's factory discovery mechanism.
> At
> > > > > this time, the catalog is actually only a storage medium for saving
> > > > > schema and options, such as GenericInMemoryCatalog. Catalog only
> > saves
> > > > > meta information and does not manage the underlying physical
> storage
> > > > > of tables. These tables are **unmanaged**. The DDL must have a
> > > > > "connector" option.
> > > > >
> > > > > 3. HybridCatalog: It can save both its own **managed** table and
> > > > > generic Flink **unmanaged** table, such as HiveCatalog.
> > > > >
> > > > > We want to use the "connector" option to distinguish whether it is
> > > > > managed or not.
> > > > >
> > > > > Now, consider the Flink managed table in this FLIP.
> > > > > a. ExternalCatalog can not support Flink managed tables.
> > > > > b. GenericCatalog can support Flink managed tables without the
> > > > > "connector" option.
> > > > > c. What about HybridCatalog (HiveCatalog)? Yes, we want HiveCatalog
> > to
> > > > > support Flink managed tables:
> > > > > - with "connector" option in Flink dialect is unmanaged tables
> > > > > - Hive DDL in Hive dialect is Hive managed tables, the parser will
> > add
> > > > > "connector = hive" automatically. At present, there are many
> > > > > differences between Flink DDL and Hive DDL, and even their features
> > > > > have many differences.
> > > > > - without "connector" option in Flink dialect is Flink managed
> > tables.
> > > > >
> > > > > In this way, we can support Flink managed tables while maintaining
> > > > > compatibility.
> > > > >
> > > > > Anyway, we need introduce a "SupportsFlinkManagedTable" to catalog.
> > > > >
> > > > > ############## Back to your question #################
> > > > >
> > > > > > but we should make it clear that this is a limitation and
> probably
> > > > > document how users can clean up the underlying physical storage
> > manually in
> > > > > this case
> > > > >
> > > > > Yes, it's strange that the catalog should manage tables, but some
> > > > > catalogs don't have this ability.
> > > > > - For PersistentCatalog, the meta will continue until the
> underlying
> > > > > physical storage is deleted.
> > > > > - For InMemoryCatalog, yes, we should document it for the
> underlying
> > > > > physical storage of Flink managed tables.
> > > > >
> > > > > > the HiveCatalog doesn't list a 'connector' option for its tables.
> > > > >
> > > > > Actually, It can be divided into two steps: create and save:
> > > > > - When creating a table, the table seen by HiveCatalog must have
> > > > > "connector = hive", which is the hive table (Hive managed table).
> You
> > > > > can see the "HiveCatalog.isHiveTable".
> > > > > - When saving the table, it will remove the connector of the hive
> > > > > table. We can do this: with "connector" option is Flink generic
> > table,
> > > > > without "connector" option is Hive table, with "flink-managed =
> true"
> > > > > is Flink managed table.
> > > > >
> > > > > Best,
> > > > > Jingsong Lee
> > > > >
> > > > > On Thu, Oct 21, 2021 at 8:23 PM Ingo Bürk <i...@ververica.com>
> > wrote:
> > > > > >
> > > > > > Hi JingSong,
> > > > > >
> > > > > > thank you for the answers!
> > > > > >
> > > > > > > BDT only can be dropped by Flink SQL DDL now.
> > > > > >
> > > > > > Maybe I'm misunderstanding, but that's only true from the Flink
> > side.
> > > > > What
> > > > > > I meant is that a table could disappear from a catalog entirely
> > outside
> > > > > of
> > > > > > Flink. As a simple example, consider a catalog which represents
> an
> > IMAP
> > > > > > mail server and each folder as a table. If a folder is deleted
> > from the
> > > > > > mail account, the table would disappear, but Flink would have no
> > way of
> > > > > > knowing that. I don't see a way around this problem, to be
> honest,
> > but we
> > > > > > should make it clear that this is a limitation and probably
> > document how
> > > > > > users can clean up the underlying physical storage manually in
> > this case?
> > > > > >
> > > > > > > - Option 1: Create table without the connector option, the
> table
> > will
> > > > > > > be forcibly translated to BDT.
> > > > > >
> > > > > > This would be a breaking change, right? If I remember correctly
> > (but I
> > > > > > might not :-)), even the HiveCatalog doesn't list a 'connector'
> > option
> > > > > for
> > > > > > its tables.
> > > > > >
> > > > > > This approach is also very implicit, and creating physical
> storage
> > isn't
> > > > > > exactly "free", so I personally would favor one of the other
> > approaches.
> > > > > > Option (2) would be explicit for the end user, while Option (3)
> is
> > again
> > > > > > implicit for the user and only explicit for the catalog
> > implementor, so I
> > > > > > kind of favor Option (2) because I feel that users should be
> aware
> > of
> > > > > > creating a Flink-managed table.
> > > > > >
> > > > > > We also need to consider the upgrade path here: if a catalog
> > exposes
> > > > > tables
> > > > > > without 'connector' options today, we need to make sure that once
> > this
> > > > > FLIP
> > > > > > is implemented no errors are thrown because codepaths assume that
> > > > > physical
> > > > > > storage must exist for such tables (since they were created
> before
> > the
> > > > > > FLIP).
> > > > > >
> > > > > >
> > > > > > Best
> > > > > > Ingo
> > > > > >
> > > > > > On Thu, Oct 21, 2021 at 1:31 PM Jingsong Li <
> > jingsongl...@gmail.com>
> > > > > wrote:
> > > > > >
> > > > > > > Hi Ingo and wenlong,
> > > > > > >
> > > > > > > Thanks for your feedback. Very good questions!
> > > > > > >
> > > > > > > (Built-in Dynamic Table is simplified as BDT)
> > > > > > >
> > > > > > > First, let's look at the following questions:
> > > > > > >
> > > > > > > 1. Does BDT want a separate catalog or can it be placed in all
> > > > > > > catalogs (such as InMemoryCatalog and HiveCatalog)?
> > > > > > >  - BDT wants the latter. Because in iceberg, we have seen that
> a
> > > > > > > separate catalog undoubtedly needs to recreate a set of
> > catalogs. We
> > > > > > > often don't know whether it is Flink's HiveCatalog or iceberg's
> > > > > > > HiveCatalog. This brings not only duplication of work, but also
> > > > > > > confusion.
> > > > > > >  - How does catalog persist BDT? As a general Flink table,
> > persist the
> > > > > > > schema and options of the table.
> > > > > > >
> > > > > > > 2. Is Flink's DDL mapping or real physical storage?
> > > > > > > - Mapping: creating and dropping tables only change the mapping
> > > > > > > relationship,
> > > > > > > - Physical storage: creating and dropping tables will actually
> > delete
> > > > > > > the underlying storage
> > > > > > > - Status quo: the general connectors are all mapping, while the
> > self
> > > > > > > managed tables of Catalog are real storage.
> > > > > > > - BDT wants real physical storage, because it can provide
> > database
> > > > > > > level experience, and BDT wants to be orthogonal to catalog.
> > > > > > > Therefore, BDT is bound to break the current situation and
> > become a
> > > > > > > new concept.
> > > > > > >
> > > > > > > Based on the above conclusion, let's look at your question.
> > > > > > >
> > > > > > > To Ingo:
> > > > > > >
> > > > > > > > if tables are dropped externally rather than through Flink
> SQL
> > DDL,
> > > > > how
> > > > > > > would Flink be able to remove the physical storage for it.
> > > > > > >
> > > > > > > BDT only can be dropped by Flink SQL DDL now.
> > > > > > >
> > > > > > > To wenlong:
> > > > > > >
> > > > > > > > How the built-in table would be persisted in Catalog?
> > > > > > >
> > > > > > > Just like a general Flink table, persist the schema and options
> > of the
> > > > > > > table.
> > > > > > >
> > > > > > > > Is it possible to read historical data from the file store
> > first and
> > > > > > > then fetch new changes from the log store? something like a
> > hybrid
> > > > > source,
> > > > > > > but I think we need a mechanism to get exactly-once semantic.
> > > > > > >
> > > > > > > This can be implemented, but we need to save the Kafka offset
> of
> > the
> > > > > > > current checkpoint in the snapshot, so that we can accurately
> > switch
> > > > > > > between file and log. But this is not in MVP.
> > > > > > >
> > > > > > > To Ingo and wenlong:
> > > > > > >
> > > > > > > > Currently a catalog can provide a default table factory and
> > would be
> > > > > > > used as the top priority factory, what would happen after the
> > default
> > > > > > > factory was introduced.
> > > > > > >
> > > > > > > - Option 1: Create table without the connector option, the
> table
> > will
> > > > > > > be forcibly translated to BDT.
> > > > > > > - Option 2: Introduce new grammar, for example, "CREATE MANAGED
> > > > > > > TABLE...", this will separate from the default table of
> catalog.
> > > > > > > Catalog can define its own managed tables.
> > > > > > > - Option 3: Create table without the connector option, but
> > introduce
> > > > > > > interface to Catalog, for example, "SupportsFlinkManagedTable".
> > The
> > > > > > > catalog that can support BDT can implement
> > > > > > > it.(InMemoryCatalog,HiveCatalog). Catalogs that do not support
> > BDT can
> > > > > > > implement their own managed tables.(IcebergCatalog, these
> > catalogs do
> > > > > > > not even support other flink tables)
> > > > > > >
> > > > > > > Best,
> > > > > > > Jingsong
> > > > > > >
> > > > > > > On Thu, Oct 21, 2021 at 11:37 AM wenlong.lwl <
> > wenlong88....@gmail.com>
> > > > > > > wrote:
> > > > > > > >
> > > > > > > > Hi Jingsong, thanks for the proposal, providing a built-in
> > storage
> > > > > > > solution
> > > > > > > > for users will make flink SQL much more easier to use in
> > production.
> > > > > > > >
> > > > > > > > I have some questions which may be missed in the FLIP, but
> may
> > be
> > > > > > > important
> > > > > > > > IMO:
> > > > > > > > 1. Is it possible to read historical data from the file store
> > first
> > > > > and
> > > > > > > > then fetch new changes from the log store? something like a
> > hybrid
> > > > > > > source,
> > > > > > > > but I think we need a mechanism to get exactly-once semantic.
> > > > > > > > 2. How the built-in table would be persisted in Catalog?
> > > > > > > > 3. Currently a catalog can provide a default table factory
> and
> > would
> > > > > be
> > > > > > > > used as the top priority factory, what would happen after the
> > default
> > > > > > > > factory was introduced.
> > > > > > > >
> > > > > > > > On Wed, 20 Oct 2021 at 19:35, Ingo Bürk <i...@ververica.com>
> > wrote:
> > > > > > > >
> > > > > > > > > Hi Jingsong,
> > > > > > > > >
> > > > > > > > > thank you for writing up the proposal. The benefits such a
> > > > > mechanism
> > > > > > > will
> > > > > > > > > bring will be very valuable! I haven't yet looked into this
> > in
> > > > > detail,
> > > > > > > but
> > > > > > > > > one question came to my mind immediately:
> > > > > > > > >
> > > > > > > > > The DDL for these tables seems to rely on there not being a
> > > > > 'connector'
> > > > > > > > > option. However, catalogs can provide a custom factory, and
> > thus
> > > > > tables
> > > > > > > > > don't necessarily need to contain such an option already
> > today. How
> > > > > > > will
> > > > > > > > > this interact / work with catalogs? I think there are more
> > points
> > > > > > > regarding
> > > > > > > > > interaction with catalogs, e.g. if tables are dropped
> > externally
> > > > > rather
> > > > > > > > > than through Flink SQL DDL, how would Flink be able to
> > remove the
> > > > > > > physical
> > > > > > > > > storage for it.
> > > > > > > > >
> > > > > > > > >
> > > > > > > > > Best
> > > > > > > > > Ingo
> > > > > > > > >
> > > > > > > > > On Wed, Oct 20, 2021 at 11:14 AM Jingsong Li <
> > > > > jingsongl...@gmail.com>
> > > > > > > > > wrote:
> > > > > > > > >
> > > > > > > > > > Hi all,
> > > > > > > > > >
> > > > > > > > > > Kurt and I propose to introduce built-in storage support
> > for
> > > > > dynamic
> > > > > > > > > > table, a truly unified changelog & table representation,
> > from
> > > > > Flink
> > > > > > > > > > SQL’s perspective. We believe this kind of storage will
> > improve
> > > > > the
> > > > > > > > > > usability a lot.
> > > > > > > > > >
> > > > > > > > > > We want to highlight some characteristics about this
> > storage:
> > > > > > > > > >
> > > > > > > > > > - It’s a built-in storage for Flink SQL
> > > > > > > > > > ** Improve usability issues
> > > > > > > > > > ** Flink DDL is no longer just a mapping, but a real
> > creation for
> > > > > > > these
> > > > > > > > > > tables
> > > > > > > > > > ** Masks & abstracts the underlying technical details, no
> > > > > annoying
> > > > > > > > > options
> > > > > > > > > >
> > > > > > > > > > - Supports subsecond streaming write & consumption
> > > > > > > > > > ** It could be backed by a service-oriented message queue
> > (Like
> > > > > > > Kafka)
> > > > > > > > > > ** High throughput scan capability
> > > > > > > > > > ** Filesystem with columnar formats would be an ideal
> > choice just
> > > > > > > like
> > > > > > > > > > iceberg/hudi does.
> > > > > > > > > >
> > > > > > > > > > - More importantly, in order to solve the cognitive bar,
> > storage
> > > > > > > needs
> > > > > > > > > > to automatically address various Insert/Update/Delete
> > inputs and
> > > > > > > table
> > > > > > > > > > definitions
> > > > > > > > > > ** Receive any type of changelog
> > > > > > > > > > ** Table can have primary key or no primary key
> > > > > > > > > >
> > > > > > > > > > Looking forward to your feedback.
> > > > > > > > > >
> > > > > > > > > > [1]
> > > > > > > > > >
> > > > > > > > >
> > > > > > >
> > > > >
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-188%3A+Introduce+Built-in+Dynamic+Table+Storage
> > > > > > > > > >
> > > > > > > > > > Best,
> > > > > > > > > > Jingsong Lee
> > > > > > > > > >
> > > > > > > > >
> > > > > > >
> > > > > > >
> > > > > > >
> > > > > > > --
> > > > > > > Best, Jingsong Lee
> > > > > > >
> > > > >
> > > > >
> > > > >
> > > > > --
> > > > > Best, Jingsong Lee
> > > > >
> > >
> > >
> > >
> > > --
> > > Best, Jingsong Lee
> >
> >
> >
> > --
> > Best, Jingsong Lee
> >
>

Reply via email to