Hi Lincoln,

Thanks for driving this discussion, and I am so excited to see this topic being 
discussed in the Flink community!

>From my point of view, instead of the work of unifying streaming and batch in 
>DataStream API [1], this FLIP actually could make users benefit from one 
>engine to rule batch & streaming.

If we treat this FLIP as an open-source implementation of Snowflake's dynamic 
tables [2], we still lack an incremental refresh mode to make the ETL near 
real-time with a much cheaper computation cost. However, I think this could be 
done under the current design by introducing another refresh mode in the 
future. Although the extra work of incremental view maintenance would be much 
larger.

For the FLIP itself, I have several questions below:

1. It seems this FLIP does not consider the lag of refreshes across ETL layers 
from ODS ---> DWD ---> APP [3]. We currently only consider the scheduler 
interval, which means we cannot use lag to automatically schedule the upfront 
micro-batch jobs to do the work.
2. To support the automagical refreshes, we should consider the lineage in the 
catalog or somewhere else.


[1] 
https://cwiki.apache.org/confluence/display/FLINK/FLIP-134%3A+Batch+execution+for+the+DataStream+API
[2] https://docs.snowflake.com/en/user-guide/dynamic-tables-about
[3] https://docs.snowflake.com/en/user-guide/dynamic-tables-refresh

Best
Yun Tang


________________________________
From: Lincoln Lee <lincoln.8...@gmail.com>
Sent: Thursday, March 14, 2024 14:35
To: dev@flink.apache.org <dev@flink.apache.org>
Subject: Re: [DISCUSS] FLIP-435: Introduce a New Dynamic Table for Simplifying 
Data Pipelines

Hi Jing,

Thanks for your attention to this flip! I'll try to answer the following
questions.

> 1. How to define query of dynamic table?
> Use flink sql or introducing new syntax?
> If use flink sql, how to handle the difference in SQL between streaming
and
> batch processing?
> For example, a query including window aggregate based on processing time?
> or a query including global order by?

Similar to `CREATE TABLE AS query`, here the `query` also uses Flink sql and

doesn't introduce a totally new syntax.
We will not change the status respect to

the difference in functionality of flink sql itself on streaming and
batch, for example,

the proctime window agg on streaming and global sort on batch that you
mentioned,

in fact, do not work properly in the
other mode, so when the user modifies the

refresh mode of a dynamic table that is not supported, we will throw an
exception.

> 2. Whether modify the query of dynamic table is allowed?
> Or we could only refresh a dynamic table based on the initial query?

Yes, in the current design, the query definition of the
dynamic table is not allowed

 to be modified, and you can only refresh the data based on the
initial definition.

> 3. How to use dynamic table?
> The dynamic table seems to be similar to the materialized view.  Will we
do
> something like materialized view rewriting during the optimization?

It's true that dynamic table and materialized view
are similar in some ways, but as Ron

explains
there are differences. In terms of optimization, automated
materialization discovery

similar to that supported by calcite is also a potential possibility,
perhaps with the

addition of automated rewriting in the future.



Best,
Lincoln Lee


Ron liu <ron9....@gmail.com> 于2024年3月14日周四 14:01写道:

> Hi, Timo
>
> Sorry for later response,  thanks for your feedback.
> Regarding your questions:
>
> > Flink has introduced the concept of Dynamic Tables many years ago. How
>
> does the term "Dynamic Table" fit into Flink's regular tables and also
>
> how does it relate to Table API?
>
>
> > I fear that adding the DYNAMIC TABLE keyword could cause confusion for
> > users, because a term for regular CREATE TABLE (that can be "kind of
> > dynamic" as well and is backed by a changelog) is then missing. Also
> > given that we call our connectors for those tables, DynamicTableSource
> > and DynamicTableSink.
>
>
> > In general, I find it contradicting that a TABLE can be "paused" or
> > "resumed". From an English language perspective, this does sound
> > incorrect. In my opinion (without much research yet), a continuous
> > updating trigger should rather be modelled as a CREATE MATERIALIZED VIEW
> > (which users are familiar with?) or a new concept such as a CREATE TASK
> > (that can be paused and resumed?).
>
>
> 1.
> In the current concept[1], it actually includes: Dynamic Tables &
> Continuous Query. Dynamic Table is just an abstract
> logical concept
> , which in its physical form represents either a table or a changelog
> stream. It requires the combination with Continuous Query to achieve
> dynamic updates of the target table similar to a database’s
> Materialized View.
> We hope to upgrade the Dynamic Table to a real entity that users can
> operate, which combines the logical concepts of Dynamic Tables +
> Continuous Query. By integrating the definition of tables and queries,
> it can achieve functions similar to Materialized Views, simplifying
> users' data processing pipelines.
> So, the object of the suspend operation is the refresh task of the
> dynamic table. The command  `ALTER DYNAMIC TABLE table_name SUSPEND `
> is actually a shorthand for `ALTER DYNAMIC TABLE table_name SUSPEND
> REFRESH` (if written in full for clarity, we can also modify it).
>
>  2. Initially, we also considered Materialized Views
> , but ultimately decided against them. Materialized views are designed
> to enhance query performance for workloads that consist of common,
> repetitive query patterns. In essence, a materialized view represents
> the result of a query.
> However, it is not intended to support data modification. For
> Lakehouse scenarios, where the ability to delete or update data is
> crucial (such as compliance with GDPR, FLIP-2), materialized views
> fall short.
>
> 3.
> Compared to CREATE (regular) TABLE, CREATE DYNAMIC TABLE not only
> defines metadata in the catalog but also automatically initiates a
> data refresh task based on the query specified during table creation.
> It dynamically executes data updates. Users can focus on data
> dependencies and data generation logic.
>
> 4.
> The new dynamic table does not conflict with the existing
> DynamicTableSource and DynamicTableSink interfaces. For the developer,
> all that needs to be implemented is the new CatalogDynamicTable,
> without changing the implementation of source and sink.
>
> 5. For now, the FLIP does not consider supporting Table API operations on
> Dynamic Table
> . However, once the SQL syntax is finalized, we can discuss this in a
> separate FLIP. Currently, I have a rough idea: the Table API should
> also introduce
> DynamicTable operation interfaces
>  corresponding to the existing Table interfaces.
> The TableEnvironment
>  will provide relevant methods to support various dynamic table
> operations. The goal for the new Dynamic Table is to offer users an
> experience similar to using a database, which is why we prioritize
> SQL-based approaches initially.
>
> > How do you envision re-adding the functionality of a statement set, that
> > fans out to multiple tables? This is a very important use case for data
> > pipelines.
>
>
> Multi-tables is indeed a very important user scenario. In the future,
> we can consider extending the statement set syntax to support the
> creation of multiple dynamic tables.
>
>
> > > Since the early days of Flink SQL, we were discussing `SELECT STREAM *
> > FROM T EMIT 5 MINUTES`. Your proposal seems to rephrase STREAM and EMIT,
> > into other keywords DYNAMIC TABLE and FRESHNESS. But the core
> > functionality is still there. I'm wondering if we should widen the scope
> > (maybe not part of this FLIP but a new FLIP) to follow the standard more
> > closely. Making `SELECT * FROM t` bounded by default and use new syntax
> > for the dynamic behavior. Flink 2.0 would be the perfect time for this,
> > however, it would require careful discussions. What do you think?
>
>
> The query part indeed requires a separate FLIP
> for discussion, as it involves changes to the default behavior.
>
>
> [1]
>
> https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/concepts/dynamic_tables
>
>
> Best,
>
> Ron
>
>
> Jing Zhang <beyond1...@gmail.com> 于2024年3月13日周三 15:19写道:
>
> > Hi, Lincoln & Ron,
> >
> > Thanks for the proposal.
> >
> > I agree with the question raised by Timo.
> >
> > Besides, I have some other questions.
> > 1. How to define query of dynamic table?
> > Use flink sql or introducing new syntax?
> > If use flink sql, how to handle the difference in SQL between streaming
> and
> > batch processing?
> > For example, a query including window aggregate based on processing time?
> > or a query including global order by?
> >
> > 2. Whether modify the query of dynamic table is allowed?
> > Or we could only refresh a dynamic table based on initial query?
> >
> > 3. How to use dynamic table?
> > The dynamic table seems to be similar with materialized view.  Will we do
> > something like materialized view rewriting during the optimization?
> >
> > Best,
> > Jing Zhang
> >
> >
> > Timo Walther <twal...@apache.org> 于2024年3月13日周三 01:24写道:
> >
> > > Hi Lincoln & Ron,
> > >
> > > thanks for proposing this FLIP. I think a design similar to what you
> > > propose has been in the heads of many people, however, I'm wondering
> how
> > > this will fit into the bigger picture.
> > >
> > > I haven't deeply reviewed the FLIP yet, but would like to ask some
> > > initial questions:
> > >
> > > Flink has introduced the concept of Dynamic Tables many years ago. How
> > > does the term "Dynamic Table" fit into Flink's regular tables and also
> > > how does it relate to Table API?
> > >
> > > I fear that adding the DYNAMIC TABLE keyword could cause confusion for
> > > users, because a term for regular CREATE TABLE (that can be "kind of
> > > dynamic" as well and is backed by a changelog) is then missing. Also
> > > given that we call our connectors for those tables, DynamicTableSource
> > > and DynamicTableSink.
> > >
> > > In general, I find it contradicting that a TABLE can be "paused" or
> > > "resumed". From an English language perspective, this does sound
> > > incorrect. In my opinion (without much research yet), a continuous
> > > updating trigger should rather be modelled as a CREATE MATERIALIZED
> VIEW
> > > (which users are familiar with?) or a new concept such as a CREATE TASK
> > > (that can be paused and resumed?).
> > >
> > > How do you envision re-adding the functionality of a statement set,
> that
> > > fans out to multiple tables? This is a very important use case for data
> > > pipelines.
> > >
> > > Since the early days of Flink SQL, we were discussing `SELECT STREAM *
> > > FROM T EMIT 5 MINUTES`. Your proposal seems to rephrase STREAM and
> EMIT,
> > > into other keywords DYNAMIC TABLE and FRESHNESS. But the core
> > > functionality is still there. I'm wondering if we should widen the
> scope
> > > (maybe not part of this FLIP but a new FLIP) to follow the standard
> more
> > > closely. Making `SELECT * FROM t` bounded by default and use new syntax
> > > for the dynamic behavior. Flink 2.0 would be the perfect time for this,
> > > however, it would require careful discussions. What do you think?
> > >
> > > Regards,
> > > Timo
> > >
> > >
> > > On 11.03.24 08:23, Ron liu wrote:
> > > > Hi, Dev
> > > >
> > > >
> > > > Lincoln Lee and I would like to start a discussion about FLIP-435:
> > > > Introduce a  New Dynamic Table for Simplifying Data Pipelines.
> > > >
> > > >
> > > > This FLIP is designed to simplify the development of data processing
> > > > pipelines. With Dynamic Tables with uniform SQL statements and
> > > > freshness, users can define batch and streaming transformations to
> > > > data in the same way, accelerate ETL pipeline development, and manage
> > > > task scheduling automatically.
> > > >
> > > >
> > > > For more details, see FLIP-435 [1]. Looking forward to your feedback.
> > > >
> > > >
> > > > [1]
> > > >
> > > >
> > > > Best,
> > > >
> > > > Lincoln & Ron
> > > >
> > >
> > >
> >
>

Reply via email to