Hi Timo,

Thanks for your thoughtful inputs!

Yes, expanding the MATERIALIZED VIEW(MV) could achieve the same function,
but our primary concern is that by using a view, we might limit future
opportunities
to optimize queries through automatic materialization rewriting [1],
leveraging
the support for MV by physical storage. This is because we would be breaking
the intuitive semantics of a materialized view (a materialized view
represents
the result of a query) by allowing data modifications, thus losing the
potential
for such optimizations.

With these considerations in mind, we were inspired by Google Looker's
Persistent
Derived Table [2]. PDT is designed for building Looker's automated
modeling,
aligning with our purpose for the stream-batch automatic pipeline.
Therefore,
we are considering another candidate, Derived Table, the term 'derive'
suggests a
query, and 'table' retains modifiability. This approach would not disrupt
our current
concept of a dynamic table, preserving the future utility of MVs.

Conceptually, a Derived Table is a Dynamic Table + Continuous Query. By
introducing
 a new concept Derived Table for this FLIP, this makes all concepts to play
together nicely.

What do you think about this?

[1] https://calcite.apache.org/docs/materialized_views.html
[2]
https://cloud.google.com/looker/docs/derived-tables#persistent_derived_tables


Best,
Lincoln Lee


Timo Walther <twal...@apache.org> 于2024年3月22日周五 17:54写道:

> Hi Ron,
>
> thanks for the detailed answer. Sorry, for my late reply, we had a
> conference that kept me busy.
>
>  > In the current concept[1], it actually includes: Dynamic Tables &
>  > & Continuous Query. Dynamic Table is just an abstract logical concept
>
> This explanation makes sense to me. But the docs also say "A continuous
> query is evaluated on the dynamic table yielding a new dynamic table.".
> So even our regular CREATE TABLEs are considered dynamic tables. This
> can also be seen in the diagram "Dynamic Table -> Continuous Query ->
> Dynamic Table". Currently, Flink queries can only be executed on Dynamic
> Tables.
>
>  > In essence, a materialized view represents the result of a query.
>
> Isn't that what your proposal does as well?
>
>  > the object of the suspend operation is the refresh task of the
> dynamic table
>
> I understand that Snowflake uses the term [1] to merge their concepts of
> STREAM, TASK, and TABLE into one piece of concept. But Flink has no
> concept of a "refresh task". Also, they already introduced MATERIALIZED
> VIEW. Flink is in the convenient position that the concept of
> materialized views is not taken (reserved maybe for exactly this use
> case?). And SQL standard concept could be "slightly adapted" to our
> needs. Looking at other vendors like Postgres[2], they also use
> `REFRESH` commands so why not adding additional commands such as DELETE
> or UPDATE. Oracle supports  "ON PREBUILT TABLE clause tells the database
> to use an existing table segment"[3] which comes closer to what we want
> as well.
>
>  > it is not intended to support data modification
>
> This is an argument that I understand. But we as Flink could allow data
> modifications. This way we are only extending the standard and don't
> introduce new concepts.
>
> If we can't agree on using MATERIALIZED VIEW concept. We should fix our
> syntax in a Flink 2.0 effort. Making regular tables bounded and dynamic
> tables unbounded. We would be closer to the SQL standard with this and
> pave the way for the future. I would actually support this if all
> concepts play together nicely.
>
>  > In the future, we can consider extending the statement set syntax to
> support the creation of multiple dynamic tables.
>
> It's good that we called the concept STATEMENT SET. This allows us to
> defined CREATE TABLE within. Even if it might look a bit confusing.
>
> Regards,
> Timo
>
> [1] https://docs.snowflake.com/en/user-guide/dynamic-tables-about
> [2]
> https://www.postgresql.org/docs/current/sql-creatematerializedview.html
> [3] https://oracle-base.com/articles/misc/materialized-views
>
> On 21.03.24 04:14, Feng Jin wrote:
> > Hi Ron and Lincoln
> >
> > Thanks for driving this discussion.  I believe it will greatly improve
> the
> > convenience of managing user real-time pipelines.
> >
> > I have some questions.
> >
> > *Regarding Limitations of Dynamic Table:*
> >
> >> Does not support modifying the select statement after the dynamic table
> > is created.
> >
> > Although currently we restrict users from modifying the query, I wonder
> if
> > we can provide a better way to help users rebuild it without affecting
> > downstream OLAP queries.
> >
> >
> > *Regarding the management of background jobs:*
> >
> > 1. From the documentation, the definitions SQL and job information are
> > stored in the Catalog. Does this mean that if a system needs to adapt to
> > Dynamic Tables, it also needs to store Flink's job information in the
> > corresponding system?
> > For example, does MySQL's Catalog need to store flink job information as
> > well?
> >
> >
> > 2. Users still need to consider how much memory is being used, how large
> > the concurrency is, which type of state backend is being used, and may
> need
> > to set TTL expiration.
> >
> >
> > *Regarding the Refresh Part:*
> >
> >> If the refresh mode is continuous and a background job is running,
> > caution should be taken with the refresh command as it can lead to
> > inconsistent data.
> >
> > When we submit a refresh command, can we help users detect if there are
> any
> > running jobs and automatically stop them before executing the refresh
> > command? Then wait for it to complete before restarting the background
> > streaming job?
> >
> > Best,
> > Feng
> >
> > On Tue, Mar 19, 2024 at 9:40 PM Lincoln Lee <lincoln.8...@gmail.com>
> wrote:
> >
> >> Hi Yun,
> >>
> >> Thank you very much for your valuable input!
> >>
> >> Incremental mode is indeed an attractive idea, we have also discussed
> >> this, but in the current design,
> >>
> >> we first provided two refresh modes: CONTINUOUS and
> >> FULL. Incremental mode can be introduced
> >>
> >> once the execution layer has the capability.
> >>
> >> My answer for the two questions:
> >>
> >> 1.
> >> Yes, cascading is a good question.  Current proposal provides a
> >> freshness that defines a dynamic
> >> table relative to the base table’s lag. If users need to consider the
> >> end-to-end freshness of multiple
> >> cascaded dynamic tables, he can manually split them for now. Of
> >> course, how to let multiple cascaded
> >>   or dependent dynamic tables complete the freshness definition in a
> >> simpler way, I think it can be
> >> extended in the future.
> >>
> >> 2.
> >> Cascading refresh is also a part we focus on discussing. In this flip,
> >> we hope to focus as much as
> >> possible on the core features (as it already involves a lot things),
> >> so we did not directly introduce related
> >>   syntax. However, based on the current design, combined with the
> >> catalog and lineage, theoretically,
> >> users can also finish the cascading refresh.
> >>
> >>
> >> Best,
> >> Lincoln Lee
> >>
> >>
> >> Yun Tang <myas...@live.com> 于2024年3月19日周二 13:45写道:
> >>
> >>> Hi Lincoln,
> >>>
> >>> Thanks for driving this discussion, and I am so excited to see this
> topic
> >>> being discussed in the Flink community!
> >>>
> >>>  From my point of view, instead of the work of unifying streaming and
> >> batch
> >>> in DataStream API [1], this FLIP actually could make users benefit from
> >> one
> >>> engine to rule batch & streaming.
> >>>
> >>> If we treat this FLIP as an open-source implementation of Snowflake's
> >>> dynamic tables [2], we still lack an incremental refresh mode to make
> the
> >>> ETL near real-time with a much cheaper computation cost. However, I
> think
> >>> this could be done under the current design by introducing another
> >> refresh
> >>> mode in the future. Although the extra work of incremental view
> >> maintenance
> >>> would be much larger.
> >>>
> >>> For the FLIP itself, I have several questions below:
> >>>
> >>> 1. It seems this FLIP does not consider the lag of refreshes across ETL
> >>> layers from ODS ---> DWD ---> APP [3]. We currently only consider the
> >>> scheduler interval, which means we cannot use lag to automatically
> >> schedule
> >>> the upfront micro-batch jobs to do the work.
> >>> 2. To support the automagical refreshes, we should consider the lineage
> >> in
> >>> the catalog or somewhere else.
> >>>
> >>>
> >>> [1]
> >>>
> >>
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-134%3A+Batch+execution+for+the+DataStream+API
> >>> [2] https://docs.snowflake.com/en/user-guide/dynamic-tables-about
> >>> [3] https://docs.snowflake.com/en/user-guide/dynamic-tables-refresh
> >>>
> >>> Best
> >>> Yun Tang
> >>>
> >>>
> >>> ________________________________
> >>> From: Lincoln Lee <lincoln.8...@gmail.com>
> >>> Sent: Thursday, March 14, 2024 14:35
> >>> To: dev@flink.apache.org <dev@flink.apache.org>
> >>> Subject: Re: [DISCUSS] FLIP-435: Introduce a New Dynamic Table for
> >>> Simplifying Data Pipelines
> >>>
> >>> Hi Jing,
> >>>
> >>> Thanks for your attention to this flip! I'll try to answer the
> following
> >>> questions.
> >>>
> >>>> 1. How to define query of dynamic table?
> >>>> Use flink sql or introducing new syntax?
> >>>> If use flink sql, how to handle the difference in SQL between
> streaming
> >>> and
> >>>> batch processing?
> >>>> For example, a query including window aggregate based on processing
> >> time?
> >>>> or a query including global order by?
> >>>
> >>> Similar to `CREATE TABLE AS query`, here the `query` also uses Flink
> sql
> >>> and
> >>>
> >>> doesn't introduce a totally new syntax.
> >>> We will not change the status respect to
> >>>
> >>> the difference in functionality of flink sql itself on streaming and
> >>> batch, for example,
> >>>
> >>> the proctime window agg on streaming and global sort on batch that you
> >>> mentioned,
> >>>
> >>> in fact, do not work properly in the
> >>> other mode, so when the user modifies the
> >>>
> >>> refresh mode of a dynamic table that is not supported, we will throw an
> >>> exception.
> >>>
> >>>> 2. Whether modify the query of dynamic table is allowed?
> >>>> Or we could only refresh a dynamic table based on the initial query?
> >>>
> >>> Yes, in the current design, the query definition of the
> >>> dynamic table is not allowed
> >>>
> >>>   to be modified, and you can only refresh the data based on the
> >>> initial definition.
> >>>
> >>>> 3. How to use dynamic table?
> >>>> The dynamic table seems to be similar to the materialized view.  Will
> >> we
> >>> do
> >>>> something like materialized view rewriting during the optimization?
> >>>
> >>> It's true that dynamic table and materialized view
> >>> are similar in some ways, but as Ron
> >>>
> >>> explains
> >>> there are differences. In terms of optimization, automated
> >>> materialization discovery
> >>>
> >>> similar to that supported by calcite is also a potential possibility,
> >>> perhaps with the
> >>>
> >>> addition of automated rewriting in the future.
> >>>
> >>>
> >>>
> >>> Best,
> >>> Lincoln Lee
> >>>
> >>>
> >>> Ron liu <ron9....@gmail.com> 于2024年3月14日周四 14:01写道:
> >>>
> >>>> Hi, Timo
> >>>>
> >>>> Sorry for later response,  thanks for your feedback.
> >>>> Regarding your questions:
> >>>>
> >>>>> Flink has introduced the concept of Dynamic Tables many years ago.
> >> How
> >>>>
> >>>> does the term "Dynamic Table" fit into Flink's regular tables and also
> >>>>
> >>>> how does it relate to Table API?
> >>>>
> >>>>
> >>>>> I fear that adding the DYNAMIC TABLE keyword could cause confusion
> >> for
> >>>>> users, because a term for regular CREATE TABLE (that can be "kind of
> >>>>> dynamic" as well and is backed by a changelog) is then missing. Also
> >>>>> given that we call our connectors for those tables,
> >> DynamicTableSource
> >>>>> and DynamicTableSink.
> >>>>
> >>>>
> >>>>> In general, I find it contradicting that a TABLE can be "paused" or
> >>>>> "resumed". From an English language perspective, this does sound
> >>>>> incorrect. In my opinion (without much research yet), a continuous
> >>>>> updating trigger should rather be modelled as a CREATE MATERIALIZED
> >>> VIEW
> >>>>> (which users are familiar with?) or a new concept such as a CREATE
> >> TASK
> >>>>> (that can be paused and resumed?).
> >>>>
> >>>>
> >>>> 1.
> >>>> In the current concept[1], it actually includes: Dynamic Tables &
> >>>> Continuous Query. Dynamic Table is just an abstract
> >>>> logical concept
> >>>> , which in its physical form represents either a table or a changelog
> >>>> stream. It requires the combination with Continuous Query to achieve
> >>>> dynamic updates of the target table similar to a database’s
> >>>> Materialized View.
> >>>> We hope to upgrade the Dynamic Table to a real entity that users can
> >>>> operate, which combines the logical concepts of Dynamic Tables +
> >>>> Continuous Query. By integrating the definition of tables and queries,
> >>>> it can achieve functions similar to Materialized Views, simplifying
> >>>> users' data processing pipelines.
> >>>> So, the object of the suspend operation is the refresh task of the
> >>>> dynamic table. The command  `ALTER DYNAMIC TABLE table_name SUSPEND `
> >>>> is actually a shorthand for `ALTER DYNAMIC TABLE table_name SUSPEND
> >>>> REFRESH` (if written in full for clarity, we can also modify it).
> >>>>
> >>>>   2. Initially, we also considered Materialized Views
> >>>> , but ultimately decided against them. Materialized views are designed
> >>>> to enhance query performance for workloads that consist of common,
> >>>> repetitive query patterns. In essence, a materialized view represents
> >>>> the result of a query.
> >>>> However, it is not intended to support data modification. For
> >>>> Lakehouse scenarios, where the ability to delete or update data is
> >>>> crucial (such as compliance with GDPR, FLIP-2), materialized views
> >>>> fall short.
> >>>>
> >>>> 3.
> >>>> Compared to CREATE (regular) TABLE, CREATE DYNAMIC TABLE not only
> >>>> defines metadata in the catalog but also automatically initiates a
> >>>> data refresh task based on the query specified during table creation.
> >>>> It dynamically executes data updates. Users can focus on data
> >>>> dependencies and data generation logic.
> >>>>
> >>>> 4.
> >>>> The new dynamic table does not conflict with the existing
> >>>> DynamicTableSource and DynamicTableSink interfaces. For the developer,
> >>>> all that needs to be implemented is the new CatalogDynamicTable,
> >>>> without changing the implementation of source and sink.
> >>>>
> >>>> 5. For now, the FLIP does not consider supporting Table API operations
> >> on
> >>>> Dynamic Table
> >>>> . However, once the SQL syntax is finalized, we can discuss this in a
> >>>> separate FLIP. Currently, I have a rough idea: the Table API should
> >>>> also introduce
> >>>> DynamicTable operation interfaces
> >>>>   corresponding to the existing Table interfaces.
> >>>> The TableEnvironment
> >>>>   will provide relevant methods to support various dynamic table
> >>>> operations. The goal for the new Dynamic Table is to offer users an
> >>>> experience similar to using a database, which is why we prioritize
> >>>> SQL-based approaches initially.
> >>>>
> >>>>> How do you envision re-adding the functionality of a statement set,
> >>> that
> >>>>> fans out to multiple tables? This is a very important use case for
> >> data
> >>>>> pipelines.
> >>>>
> >>>>
> >>>> Multi-tables is indeed a very important user scenario. In the future,
> >>>> we can consider extending the statement set syntax to support the
> >>>> creation of multiple dynamic tables.
> >>>>
> >>>>
> >>>>>> Since the early days of Flink SQL, we were discussing `SELECT
> >> STREAM
> >>> *
> >>>>> FROM T EMIT 5 MINUTES`. Your proposal seems to rephrase STREAM and
> >>> EMIT,
> >>>>> into other keywords DYNAMIC TABLE and FRESHNESS. But the core
> >>>>> functionality is still there. I'm wondering if we should widen the
> >>> scope
> >>>>> (maybe not part of this FLIP but a new FLIP) to follow the standard
> >>> more
> >>>>> closely. Making `SELECT * FROM t` bounded by default and use new
> >> syntax
> >>>>> for the dynamic behavior. Flink 2.0 would be the perfect time for
> >> this,
> >>>>> however, it would require careful discussions. What do you think?
> >>>>
> >>>>
> >>>> The query part indeed requires a separate FLIP
> >>>> for discussion, as it involves changes to the default behavior.
> >>>>
> >>>>
> >>>> [1]
> >>>>
> >>>>
> >>>
> >>
> https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/concepts/dynamic_tables
> >>>>
> >>>>
> >>>> Best,
> >>>>
> >>>> Ron
> >>>>
> >>>>
> >>>> Jing Zhang <beyond1...@gmail.com> 于2024年3月13日周三 15:19写道:
> >>>>
> >>>>> Hi, Lincoln & Ron,
> >>>>>
> >>>>> Thanks for the proposal.
> >>>>>
> >>>>> I agree with the question raised by Timo.
> >>>>>
> >>>>> Besides, I have some other questions.
> >>>>> 1. How to define query of dynamic table?
> >>>>> Use flink sql or introducing new syntax?
> >>>>> If use flink sql, how to handle the difference in SQL between
> >> streaming
> >>>> and
> >>>>> batch processing?
> >>>>> For example, a query including window aggregate based on processing
> >>> time?
> >>>>> or a query including global order by?
> >>>>>
> >>>>> 2. Whether modify the query of dynamic table is allowed?
> >>>>> Or we could only refresh a dynamic table based on initial query?
> >>>>>
> >>>>> 3. How to use dynamic table?
> >>>>> The dynamic table seems to be similar with materialized view.  Will
> >> we
> >>> do
> >>>>> something like materialized view rewriting during the optimization?
> >>>>>
> >>>>> Best,
> >>>>> Jing Zhang
> >>>>>
> >>>>>
> >>>>> Timo Walther <twal...@apache.org> 于2024年3月13日周三 01:24写道:
> >>>>>
> >>>>>> Hi Lincoln & Ron,
> >>>>>>
> >>>>>> thanks for proposing this FLIP. I think a design similar to what
> >> you
> >>>>>> propose has been in the heads of many people, however, I'm
> >> wondering
> >>>> how
> >>>>>> this will fit into the bigger picture.
> >>>>>>
> >>>>>> I haven't deeply reviewed the FLIP yet, but would like to ask some
> >>>>>> initial questions:
> >>>>>>
> >>>>>> Flink has introduced the concept of Dynamic Tables many years ago.
> >>> How
> >>>>>> does the term "Dynamic Table" fit into Flink's regular tables and
> >>> also
> >>>>>> how does it relate to Table API?
> >>>>>>
> >>>>>> I fear that adding the DYNAMIC TABLE keyword could cause confusion
> >>> for
> >>>>>> users, because a term for regular CREATE TABLE (that can be "kind
> >> of
> >>>>>> dynamic" as well and is backed by a changelog) is then missing.
> >> Also
> >>>>>> given that we call our connectors for those tables,
> >>> DynamicTableSource
> >>>>>> and DynamicTableSink.
> >>>>>>
> >>>>>> In general, I find it contradicting that a TABLE can be "paused" or
> >>>>>> "resumed". From an English language perspective, this does sound
> >>>>>> incorrect. In my opinion (without much research yet), a continuous
> >>>>>> updating trigger should rather be modelled as a CREATE MATERIALIZED
> >>>> VIEW
> >>>>>> (which users are familiar with?) or a new concept such as a CREATE
> >>> TASK
> >>>>>> (that can be paused and resumed?).
> >>>>>>
> >>>>>> How do you envision re-adding the functionality of a statement set,
> >>>> that
> >>>>>> fans out to multiple tables? This is a very important use case for
> >>> data
> >>>>>> pipelines.
> >>>>>>
> >>>>>> Since the early days of Flink SQL, we were discussing `SELECT
> >> STREAM
> >>> *
> >>>>>> FROM T EMIT 5 MINUTES`. Your proposal seems to rephrase STREAM and
> >>>> EMIT,
> >>>>>> into other keywords DYNAMIC TABLE and FRESHNESS. But the core
> >>>>>> functionality is still there. I'm wondering if we should widen the
> >>>> scope
> >>>>>> (maybe not part of this FLIP but a new FLIP) to follow the standard
> >>>> more
> >>>>>> closely. Making `SELECT * FROM t` bounded by default and use new
> >>> syntax
> >>>>>> for the dynamic behavior. Flink 2.0 would be the perfect time for
> >>> this,
> >>>>>> however, it would require careful discussions. What do you think?
> >>>>>>
> >>>>>> Regards,
> >>>>>> Timo
> >>>>>>
> >>>>>>
> >>>>>> On 11.03.24 08:23, Ron liu wrote:
> >>>>>>> Hi, Dev
> >>>>>>>
> >>>>>>>
> >>>>>>> Lincoln Lee and I would like to start a discussion about
> >> FLIP-435:
> >>>>>>> Introduce a  New Dynamic Table for Simplifying Data Pipelines.
> >>>>>>>
> >>>>>>>
> >>>>>>> This FLIP is designed to simplify the development of data
> >>> processing
> >>>>>>> pipelines. With Dynamic Tables with uniform SQL statements and
> >>>>>>> freshness, users can define batch and streaming transformations
> >> to
> >>>>>>> data in the same way, accelerate ETL pipeline development, and
> >>> manage
> >>>>>>> task scheduling automatically.
> >>>>>>>
> >>>>>>>
> >>>>>>> For more details, see FLIP-435 [1]. Looking forward to your
> >>> feedback.
> >>>>>>>
> >>>>>>>
> >>>>>>> [1]
> >>>>>>>
> >>>>>>>
> >>>>>>> Best,
> >>>>>>>
> >>>>>>> Lincoln & Ron
> >>>>>>>
> >>>>>>
> >>>>>>
> >>>>>
> >>>>
> >>>
> >>
> >
>
>

Reply via email to