Hi Ron,
Sorry for joining the discussion late, thanks for the effort.

I think the base idea is great, however I have a couple of comments:
- I want to iterate on Timo's comments regarding the confusion between
"Dynamic Table" and current Flink "Table". Should the refactoring of the
system happen in 2.0, should we rename it in this Flip ( as the suggestions
in the thread ) and address the holistic changes in a separate Flip for 2.0?
- I feel confused with how it is further with other components, the
examples provided feel like a standalone ETL job, could you provide in the
FLIP an example where the table is further used in subsequent queries
(specially in batch mode).
- I really like the standard of keeping the unified batch and streaming
approach
Best Regards
Ahmed Hamdy


On Fri, 22 Mar 2024 at 12:07, Lincoln Lee <lincoln.8...@gmail.com> wrote:

> Hi Timo,
>
> Thanks for your thoughtful inputs!
>
> Yes, expanding the MATERIALIZED VIEW(MV) could achieve the same function,
> but our primary concern is that by using a view, we might limit future
> opportunities
> to optimize queries through automatic materialization rewriting [1],
> leveraging
> the support for MV by physical storage. This is because we would be
> breaking
> the intuitive semantics of a materialized view (a materialized view
> represents
> the result of a query) by allowing data modifications, thus losing the
> potential
> for such optimizations.
>
> With these considerations in mind, we were inspired by Google Looker's
> Persistent
> Derived Table [2]. PDT is designed for building Looker's automated
> modeling,
> aligning with our purpose for the stream-batch automatic pipeline.
> Therefore,
> we are considering another candidate, Derived Table, the term 'derive'
> suggests a
> query, and 'table' retains modifiability. This approach would not disrupt
> our current
> concept of a dynamic table, preserving the future utility of MVs.
>
> Conceptually, a Derived Table is a Dynamic Table + Continuous Query. By
> introducing
>  a new concept Derived Table for this FLIP, this makes all concepts to play
> together nicely.
>
> What do you think about this?
>
> [1] https://calcite.apache.org/docs/materialized_views.html
> [2]
>
> https://cloud.google.com/looker/docs/derived-tables#persistent_derived_tables
>
>
> Best,
> Lincoln Lee
>
>
> Timo Walther <twal...@apache.org> 于2024年3月22日周五 17:54写道:
>
> > Hi Ron,
> >
> > thanks for the detailed answer. Sorry, for my late reply, we had a
> > conference that kept me busy.
> >
> >  > In the current concept[1], it actually includes: Dynamic Tables &
> >  > & Continuous Query. Dynamic Table is just an abstract logical concept
> >
> > This explanation makes sense to me. But the docs also say "A continuous
> > query is evaluated on the dynamic table yielding a new dynamic table.".
> > So even our regular CREATE TABLEs are considered dynamic tables. This
> > can also be seen in the diagram "Dynamic Table -> Continuous Query ->
> > Dynamic Table". Currently, Flink queries can only be executed on Dynamic
> > Tables.
> >
> >  > In essence, a materialized view represents the result of a query.
> >
> > Isn't that what your proposal does as well?
> >
> >  > the object of the suspend operation is the refresh task of the
> > dynamic table
> >
> > I understand that Snowflake uses the term [1] to merge their concepts of
> > STREAM, TASK, and TABLE into one piece of concept. But Flink has no
> > concept of a "refresh task". Also, they already introduced MATERIALIZED
> > VIEW. Flink is in the convenient position that the concept of
> > materialized views is not taken (reserved maybe for exactly this use
> > case?). And SQL standard concept could be "slightly adapted" to our
> > needs. Looking at other vendors like Postgres[2], they also use
> > `REFRESH` commands so why not adding additional commands such as DELETE
> > or UPDATE. Oracle supports  "ON PREBUILT TABLE clause tells the database
> > to use an existing table segment"[3] which comes closer to what we want
> > as well.
> >
> >  > it is not intended to support data modification
> >
> > This is an argument that I understand. But we as Flink could allow data
> > modifications. This way we are only extending the standard and don't
> > introduce new concepts.
> >
> > If we can't agree on using MATERIALIZED VIEW concept. We should fix our
> > syntax in a Flink 2.0 effort. Making regular tables bounded and dynamic
> > tables unbounded. We would be closer to the SQL standard with this and
> > pave the way for the future. I would actually support this if all
> > concepts play together nicely.
> >
> >  > In the future, we can consider extending the statement set syntax to
> > support the creation of multiple dynamic tables.
> >
> > It's good that we called the concept STATEMENT SET. This allows us to
> > defined CREATE TABLE within. Even if it might look a bit confusing.
> >
> > Regards,
> > Timo
> >
> > [1] https://docs.snowflake.com/en/user-guide/dynamic-tables-about
> > [2]
> > https://www.postgresql.org/docs/current/sql-creatematerializedview.html
> > [3] https://oracle-base.com/articles/misc/materialized-views
> >
> > On 21.03.24 04:14, Feng Jin wrote:
> > > Hi Ron and Lincoln
> > >
> > > Thanks for driving this discussion.  I believe it will greatly improve
> > the
> > > convenience of managing user real-time pipelines.
> > >
> > > I have some questions.
> > >
> > > *Regarding Limitations of Dynamic Table:*
> > >
> > >> Does not support modifying the select statement after the dynamic
> table
> > > is created.
> > >
> > > Although currently we restrict users from modifying the query, I wonder
> > if
> > > we can provide a better way to help users rebuild it without affecting
> > > downstream OLAP queries.
> > >
> > >
> > > *Regarding the management of background jobs:*
> > >
> > > 1. From the documentation, the definitions SQL and job information are
> > > stored in the Catalog. Does this mean that if a system needs to adapt
> to
> > > Dynamic Tables, it also needs to store Flink's job information in the
> > > corresponding system?
> > > For example, does MySQL's Catalog need to store flink job information
> as
> > > well?
> > >
> > >
> > > 2. Users still need to consider how much memory is being used, how
> large
> > > the concurrency is, which type of state backend is being used, and may
> > need
> > > to set TTL expiration.
> > >
> > >
> > > *Regarding the Refresh Part:*
> > >
> > >> If the refresh mode is continuous and a background job is running,
> > > caution should be taken with the refresh command as it can lead to
> > > inconsistent data.
> > >
> > > When we submit a refresh command, can we help users detect if there are
> > any
> > > running jobs and automatically stop them before executing the refresh
> > > command? Then wait for it to complete before restarting the background
> > > streaming job?
> > >
> > > Best,
> > > Feng
> > >
> > > On Tue, Mar 19, 2024 at 9:40 PM Lincoln Lee <lincoln.8...@gmail.com>
> > wrote:
> > >
> > >> Hi Yun,
> > >>
> > >> Thank you very much for your valuable input!
> > >>
> > >> Incremental mode is indeed an attractive idea, we have also discussed
> > >> this, but in the current design,
> > >>
> > >> we first provided two refresh modes: CONTINUOUS and
> > >> FULL. Incremental mode can be introduced
> > >>
> > >> once the execution layer has the capability.
> > >>
> > >> My answer for the two questions:
> > >>
> > >> 1.
> > >> Yes, cascading is a good question.  Current proposal provides a
> > >> freshness that defines a dynamic
> > >> table relative to the base table’s lag. If users need to consider the
> > >> end-to-end freshness of multiple
> > >> cascaded dynamic tables, he can manually split them for now. Of
> > >> course, how to let multiple cascaded
> > >>   or dependent dynamic tables complete the freshness definition in a
> > >> simpler way, I think it can be
> > >> extended in the future.
> > >>
> > >> 2.
> > >> Cascading refresh is also a part we focus on discussing. In this flip,
> > >> we hope to focus as much as
> > >> possible on the core features (as it already involves a lot things),
> > >> so we did not directly introduce related
> > >>   syntax. However, based on the current design, combined with the
> > >> catalog and lineage, theoretically,
> > >> users can also finish the cascading refresh.
> > >>
> > >>
> > >> Best,
> > >> Lincoln Lee
> > >>
> > >>
> > >> Yun Tang <myas...@live.com> 于2024年3月19日周二 13:45写道:
> > >>
> > >>> Hi Lincoln,
> > >>>
> > >>> Thanks for driving this discussion, and I am so excited to see this
> > topic
> > >>> being discussed in the Flink community!
> > >>>
> > >>>  From my point of view, instead of the work of unifying streaming and
> > >> batch
> > >>> in DataStream API [1], this FLIP actually could make users benefit
> from
> > >> one
> > >>> engine to rule batch & streaming.
> > >>>
> > >>> If we treat this FLIP as an open-source implementation of Snowflake's
> > >>> dynamic tables [2], we still lack an incremental refresh mode to make
> > the
> > >>> ETL near real-time with a much cheaper computation cost. However, I
> > think
> > >>> this could be done under the current design by introducing another
> > >> refresh
> > >>> mode in the future. Although the extra work of incremental view
> > >> maintenance
> > >>> would be much larger.
> > >>>
> > >>> For the FLIP itself, I have several questions below:
> > >>>
> > >>> 1. It seems this FLIP does not consider the lag of refreshes across
> ETL
> > >>> layers from ODS ---> DWD ---> APP [3]. We currently only consider the
> > >>> scheduler interval, which means we cannot use lag to automatically
> > >> schedule
> > >>> the upfront micro-batch jobs to do the work.
> > >>> 2. To support the automagical refreshes, we should consider the
> lineage
> > >> in
> > >>> the catalog or somewhere else.
> > >>>
> > >>>
> > >>> [1]
> > >>>
> > >>
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-134%3A+Batch+execution+for+the+DataStream+API
> > >>> [2] https://docs.snowflake.com/en/user-guide/dynamic-tables-about
> > >>> [3] https://docs.snowflake.com/en/user-guide/dynamic-tables-refresh
> > >>>
> > >>> Best
> > >>> Yun Tang
> > >>>
> > >>>
> > >>> ________________________________
> > >>> From: Lincoln Lee <lincoln.8...@gmail.com>
> > >>> Sent: Thursday, March 14, 2024 14:35
> > >>> To: dev@flink.apache.org <dev@flink.apache.org>
> > >>> Subject: Re: [DISCUSS] FLIP-435: Introduce a New Dynamic Table for
> > >>> Simplifying Data Pipelines
> > >>>
> > >>> Hi Jing,
> > >>>
> > >>> Thanks for your attention to this flip! I'll try to answer the
> > following
> > >>> questions.
> > >>>
> > >>>> 1. How to define query of dynamic table?
> > >>>> Use flink sql or introducing new syntax?
> > >>>> If use flink sql, how to handle the difference in SQL between
> > streaming
> > >>> and
> > >>>> batch processing?
> > >>>> For example, a query including window aggregate based on processing
> > >> time?
> > >>>> or a query including global order by?
> > >>>
> > >>> Similar to `CREATE TABLE AS query`, here the `query` also uses Flink
> > sql
> > >>> and
> > >>>
> > >>> doesn't introduce a totally new syntax.
> > >>> We will not change the status respect to
> > >>>
> > >>> the difference in functionality of flink sql itself on streaming and
> > >>> batch, for example,
> > >>>
> > >>> the proctime window agg on streaming and global sort on batch that
> you
> > >>> mentioned,
> > >>>
> > >>> in fact, do not work properly in the
> > >>> other mode, so when the user modifies the
> > >>>
> > >>> refresh mode of a dynamic table that is not supported, we will throw
> an
> > >>> exception.
> > >>>
> > >>>> 2. Whether modify the query of dynamic table is allowed?
> > >>>> Or we could only refresh a dynamic table based on the initial query?
> > >>>
> > >>> Yes, in the current design, the query definition of the
> > >>> dynamic table is not allowed
> > >>>
> > >>>   to be modified, and you can only refresh the data based on the
> > >>> initial definition.
> > >>>
> > >>>> 3. How to use dynamic table?
> > >>>> The dynamic table seems to be similar to the materialized view.
> Will
> > >> we
> > >>> do
> > >>>> something like materialized view rewriting during the optimization?
> > >>>
> > >>> It's true that dynamic table and materialized view
> > >>> are similar in some ways, but as Ron
> > >>>
> > >>> explains
> > >>> there are differences. In terms of optimization, automated
> > >>> materialization discovery
> > >>>
> > >>> similar to that supported by calcite is also a potential possibility,
> > >>> perhaps with the
> > >>>
> > >>> addition of automated rewriting in the future.
> > >>>
> > >>>
> > >>>
> > >>> Best,
> > >>> Lincoln Lee
> > >>>
> > >>>
> > >>> Ron liu <ron9....@gmail.com> 于2024年3月14日周四 14:01写道:
> > >>>
> > >>>> Hi, Timo
> > >>>>
> > >>>> Sorry for later response,  thanks for your feedback.
> > >>>> Regarding your questions:
> > >>>>
> > >>>>> Flink has introduced the concept of Dynamic Tables many years ago.
> > >> How
> > >>>>
> > >>>> does the term "Dynamic Table" fit into Flink's regular tables and
> also
> > >>>>
> > >>>> how does it relate to Table API?
> > >>>>
> > >>>>
> > >>>>> I fear that adding the DYNAMIC TABLE keyword could cause confusion
> > >> for
> > >>>>> users, because a term for regular CREATE TABLE (that can be "kind
> of
> > >>>>> dynamic" as well and is backed by a changelog) is then missing.
> Also
> > >>>>> given that we call our connectors for those tables,
> > >> DynamicTableSource
> > >>>>> and DynamicTableSink.
> > >>>>
> > >>>>
> > >>>>> In general, I find it contradicting that a TABLE can be "paused" or
> > >>>>> "resumed". From an English language perspective, this does sound
> > >>>>> incorrect. In my opinion (without much research yet), a continuous
> > >>>>> updating trigger should rather be modelled as a CREATE MATERIALIZED
> > >>> VIEW
> > >>>>> (which users are familiar with?) or a new concept such as a CREATE
> > >> TASK
> > >>>>> (that can be paused and resumed?).
> > >>>>
> > >>>>
> > >>>> 1.
> > >>>> In the current concept[1], it actually includes: Dynamic Tables &
> > >>>> Continuous Query. Dynamic Table is just an abstract
> > >>>> logical concept
> > >>>> , which in its physical form represents either a table or a
> changelog
> > >>>> stream. It requires the combination with Continuous Query to achieve
> > >>>> dynamic updates of the target table similar to a database’s
> > >>>> Materialized View.
> > >>>> We hope to upgrade the Dynamic Table to a real entity that users can
> > >>>> operate, which combines the logical concepts of Dynamic Tables +
> > >>>> Continuous Query. By integrating the definition of tables and
> queries,
> > >>>> it can achieve functions similar to Materialized Views, simplifying
> > >>>> users' data processing pipelines.
> > >>>> So, the object of the suspend operation is the refresh task of the
> > >>>> dynamic table. The command  `ALTER DYNAMIC TABLE table_name SUSPEND
> `
> > >>>> is actually a shorthand for `ALTER DYNAMIC TABLE table_name SUSPEND
> > >>>> REFRESH` (if written in full for clarity, we can also modify it).
> > >>>>
> > >>>>   2. Initially, we also considered Materialized Views
> > >>>> , but ultimately decided against them. Materialized views are
> designed
> > >>>> to enhance query performance for workloads that consist of common,
> > >>>> repetitive query patterns. In essence, a materialized view
> represents
> > >>>> the result of a query.
> > >>>> However, it is not intended to support data modification. For
> > >>>> Lakehouse scenarios, where the ability to delete or update data is
> > >>>> crucial (such as compliance with GDPR, FLIP-2), materialized views
> > >>>> fall short.
> > >>>>
> > >>>> 3.
> > >>>> Compared to CREATE (regular) TABLE, CREATE DYNAMIC TABLE not only
> > >>>> defines metadata in the catalog but also automatically initiates a
> > >>>> data refresh task based on the query specified during table
> creation.
> > >>>> It dynamically executes data updates. Users can focus on data
> > >>>> dependencies and data generation logic.
> > >>>>
> > >>>> 4.
> > >>>> The new dynamic table does not conflict with the existing
> > >>>> DynamicTableSource and DynamicTableSink interfaces. For the
> developer,
> > >>>> all that needs to be implemented is the new CatalogDynamicTable,
> > >>>> without changing the implementation of source and sink.
> > >>>>
> > >>>> 5. For now, the FLIP does not consider supporting Table API
> operations
> > >> on
> > >>>> Dynamic Table
> > >>>> . However, once the SQL syntax is finalized, we can discuss this in
> a
> > >>>> separate FLIP. Currently, I have a rough idea: the Table API should
> > >>>> also introduce
> > >>>> DynamicTable operation interfaces
> > >>>>   corresponding to the existing Table interfaces.
> > >>>> The TableEnvironment
> > >>>>   will provide relevant methods to support various dynamic table
> > >>>> operations. The goal for the new Dynamic Table is to offer users an
> > >>>> experience similar to using a database, which is why we prioritize
> > >>>> SQL-based approaches initially.
> > >>>>
> > >>>>> How do you envision re-adding the functionality of a statement set,
> > >>> that
> > >>>>> fans out to multiple tables? This is a very important use case for
> > >> data
> > >>>>> pipelines.
> > >>>>
> > >>>>
> > >>>> Multi-tables is indeed a very important user scenario. In the
> future,
> > >>>> we can consider extending the statement set syntax to support the
> > >>>> creation of multiple dynamic tables.
> > >>>>
> > >>>>
> > >>>>>> Since the early days of Flink SQL, we were discussing `SELECT
> > >> STREAM
> > >>> *
> > >>>>> FROM T EMIT 5 MINUTES`. Your proposal seems to rephrase STREAM and
> > >>> EMIT,
> > >>>>> into other keywords DYNAMIC TABLE and FRESHNESS. But the core
> > >>>>> functionality is still there. I'm wondering if we should widen the
> > >>> scope
> > >>>>> (maybe not part of this FLIP but a new FLIP) to follow the standard
> > >>> more
> > >>>>> closely. Making `SELECT * FROM t` bounded by default and use new
> > >> syntax
> > >>>>> for the dynamic behavior. Flink 2.0 would be the perfect time for
> > >> this,
> > >>>>> however, it would require careful discussions. What do you think?
> > >>>>
> > >>>>
> > >>>> The query part indeed requires a separate FLIP
> > >>>> for discussion, as it involves changes to the default behavior.
> > >>>>
> > >>>>
> > >>>> [1]
> > >>>>
> > >>>>
> > >>>
> > >>
> >
> https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/concepts/dynamic_tables
> > >>>>
> > >>>>
> > >>>> Best,
> > >>>>
> > >>>> Ron
> > >>>>
> > >>>>
> > >>>> Jing Zhang <beyond1...@gmail.com> 于2024年3月13日周三 15:19写道:
> > >>>>
> > >>>>> Hi, Lincoln & Ron,
> > >>>>>
> > >>>>> Thanks for the proposal.
> > >>>>>
> > >>>>> I agree with the question raised by Timo.
> > >>>>>
> > >>>>> Besides, I have some other questions.
> > >>>>> 1. How to define query of dynamic table?
> > >>>>> Use flink sql or introducing new syntax?
> > >>>>> If use flink sql, how to handle the difference in SQL between
> > >> streaming
> > >>>> and
> > >>>>> batch processing?
> > >>>>> For example, a query including window aggregate based on processing
> > >>> time?
> > >>>>> or a query including global order by?
> > >>>>>
> > >>>>> 2. Whether modify the query of dynamic table is allowed?
> > >>>>> Or we could only refresh a dynamic table based on initial query?
> > >>>>>
> > >>>>> 3. How to use dynamic table?
> > >>>>> The dynamic table seems to be similar with materialized view.  Will
> > >> we
> > >>> do
> > >>>>> something like materialized view rewriting during the optimization?
> > >>>>>
> > >>>>> Best,
> > >>>>> Jing Zhang
> > >>>>>
> > >>>>>
> > >>>>> Timo Walther <twal...@apache.org> 于2024年3月13日周三 01:24写道:
> > >>>>>
> > >>>>>> Hi Lincoln & Ron,
> > >>>>>>
> > >>>>>> thanks for proposing this FLIP. I think a design similar to what
> > >> you
> > >>>>>> propose has been in the heads of many people, however, I'm
> > >> wondering
> > >>>> how
> > >>>>>> this will fit into the bigger picture.
> > >>>>>>
> > >>>>>> I haven't deeply reviewed the FLIP yet, but would like to ask some
> > >>>>>> initial questions:
> > >>>>>>
> > >>>>>> Flink has introduced the concept of Dynamic Tables many years ago.
> > >>> How
> > >>>>>> does the term "Dynamic Table" fit into Flink's regular tables and
> > >>> also
> > >>>>>> how does it relate to Table API?
> > >>>>>>
> > >>>>>> I fear that adding the DYNAMIC TABLE keyword could cause confusion
> > >>> for
> > >>>>>> users, because a term for regular CREATE TABLE (that can be "kind
> > >> of
> > >>>>>> dynamic" as well and is backed by a changelog) is then missing.
> > >> Also
> > >>>>>> given that we call our connectors for those tables,
> > >>> DynamicTableSource
> > >>>>>> and DynamicTableSink.
> > >>>>>>
> > >>>>>> In general, I find it contradicting that a TABLE can be "paused"
> or
> > >>>>>> "resumed". From an English language perspective, this does sound
> > >>>>>> incorrect. In my opinion (without much research yet), a continuous
> > >>>>>> updating trigger should rather be modelled as a CREATE
> MATERIALIZED
> > >>>> VIEW
> > >>>>>> (which users are familiar with?) or a new concept such as a CREATE
> > >>> TASK
> > >>>>>> (that can be paused and resumed?).
> > >>>>>>
> > >>>>>> How do you envision re-adding the functionality of a statement
> set,
> > >>>> that
> > >>>>>> fans out to multiple tables? This is a very important use case for
> > >>> data
> > >>>>>> pipelines.
> > >>>>>>
> > >>>>>> Since the early days of Flink SQL, we were discussing `SELECT
> > >> STREAM
> > >>> *
> > >>>>>> FROM T EMIT 5 MINUTES`. Your proposal seems to rephrase STREAM and
> > >>>> EMIT,
> > >>>>>> into other keywords DYNAMIC TABLE and FRESHNESS. But the core
> > >>>>>> functionality is still there. I'm wondering if we should widen the
> > >>>> scope
> > >>>>>> (maybe not part of this FLIP but a new FLIP) to follow the
> standard
> > >>>> more
> > >>>>>> closely. Making `SELECT * FROM t` bounded by default and use new
> > >>> syntax
> > >>>>>> for the dynamic behavior. Flink 2.0 would be the perfect time for
> > >>> this,
> > >>>>>> however, it would require careful discussions. What do you think?
> > >>>>>>
> > >>>>>> Regards,
> > >>>>>> Timo
> > >>>>>>
> > >>>>>>
> > >>>>>> On 11.03.24 08:23, Ron liu wrote:
> > >>>>>>> Hi, Dev
> > >>>>>>>
> > >>>>>>>
> > >>>>>>> Lincoln Lee and I would like to start a discussion about
> > >> FLIP-435:
> > >>>>>>> Introduce a  New Dynamic Table for Simplifying Data Pipelines.
> > >>>>>>>
> > >>>>>>>
> > >>>>>>> This FLIP is designed to simplify the development of data
> > >>> processing
> > >>>>>>> pipelines. With Dynamic Tables with uniform SQL statements and
> > >>>>>>> freshness, users can define batch and streaming transformations
> > >> to
> > >>>>>>> data in the same way, accelerate ETL pipeline development, and
> > >>> manage
> > >>>>>>> task scheduling automatically.
> > >>>>>>>
> > >>>>>>>
> > >>>>>>> For more details, see FLIP-435 [1]. Looking forward to your
> > >>> feedback.
> > >>>>>>>
> > >>>>>>>
> > >>>>>>> [1]
> > >>>>>>>
> > >>>>>>>
> > >>>>>>> Best,
> > >>>>>>>
> > >>>>>>> Lincoln & Ron
> > >>>>>>>
> > >>>>>>
> > >>>>>>
> > >>>>>
> > >>>>
> > >>>
> > >>
> > >
> >
> >
>

Reply via email to