After thinking about this more, this discussion boils down to whether this is a special table or a special materialized view. In both cases, we would need to add a special keyword:

Either

CREATE UPDATING TABLE

or

CREATE UPDATING MATERIALIZED VIEW

I still feel that the periodic refreshing behavior is closer to a MV. If we add a special keyword to MV, the optimizer would know that the data cannot be used for query optimizations.

I will ask more people for their opinion.

Regards,
Timo


On 25.03.24 10:45, Timo Walther wrote:
Hi Ron and Lincoln,

thanks for the quick response and the very insightful discussion.

 > we might limit future opportunities to optimize queries
 > through automatic materialization rewriting by allowing data
 > modifications, thus losing the potential for such optimizations.

This argument makes a lot of sense to me. Due to the updates, the system is not in full control of the persisted data. However, the system is still in full control of the job that powers the refresh. So if the system manages all updating pipelines, it could still leverage automatic materialization rewriting but without leveraging the data at rest (only the data in flight).

 > we are considering another candidate, Derived Table, the term 'derive'
 > suggests a query, and 'table' retains modifiability. This approach
 > would not disrupt our current concept of a dynamic table

I did some research on this term. The SQL standard uses the term "derived table" extensively (defined in section 4.17.3). Thus, a lot of vendors adopt this for simply referring to a table within a subclause:

https://dev.mysql.com/doc/refman/8.0/en/derived-tables.html

https://infocenter.sybase.com/help/topic/com.sybase.infocenter.dc32300.1600/doc/html/san1390612291252.html

https://www.c-sharpcorner.com/article/derived-tables-vs-common-table-expressions/

https://stackoverflow.com/questions/26529804/what-are-the-derived-tables-in-my-explain-statement

https://www.sqlservercentral.com/articles/sql-derived-tables

Esp. the latter example is interesting, SQL Server allows things like this on derived tables:

UPDATE T SET Name='Timo' FROM (SELECT * FROM Product) AS T

SELECT * FROM Product;

Btw also Snowflake's dynamic table state:

 > Because the content of a dynamic table is fully determined
 > by the given query, the content cannot be changed by using DML.
 > You don’t insert, update, or delete the rows in a dynamic table.

So a new term makes a lot of sense.

How about using `UPDATING`?

CREATE UPDATING TABLE

This reflects that modifications can be made and from an English-language perspective you can PAUSE or RESUME the UPDATING.
Thus, a user can define UPDATING interval and mode?

Looking forward to your thoughts.

Regards,
Timo


On 25.03.24 07:09, Ron liu wrote:
Hi, Ahmed

Thanks for your feedback.

Regarding your question:

I want to iterate on Timo's comments regarding the confusion between
"Dynamic Table" and current Flink "Table". Should the refactoring of the
system happen in 2.0, should we rename it in this Flip ( as the suggestions in the thread ) and address the holistic changes in a separate Flip for 2.0?

Lincoln proposed a new concept in reply to Timo: Derived Table, which is a
combination of Dynamic Table + Continuous Query, and the use of Derived
Table will not conflict with existing concepts, what do you think?

I feel confused with how it is further with other components, the
examples provided feel like a standalone ETL job, could you provide in the
FLIP an example where the table is further used in subsequent queries
(specially in batch mode).

Thanks for your suggestion, I added how to use Dynamic Table in FLIP user
story section, Dynamic Table can be referenced by downstream Dynamic Table
and can also support OLAP queries.

Best,
Ron

Ron liu <ron9....@gmail.com> 于2024年3月23日周六 10:35写道:

Hi, Feng

Thanks for your feedback.

Although currently we restrict users from modifying the query, I wonder
if
we can provide a better way to help users rebuild it without affecting
downstream OLAP queries.

Considering the problem of data consistency, so in the first step we are
strictly limited in semantics and do not support modify the query. This is really a good problem, one of my ideas is to introduce a syntax similar to
SWAP [1], which supports exchanging two Dynamic Tables.

 From the documentation, the definitions SQL and job information are
stored in the Catalog. Does this mean that if a system needs to adapt to
Dynamic Tables, it also needs to store Flink's job information in the
corresponding system?
For example, does MySQL's Catalog need to store flink job information as
well?

Yes, currently we need to rely on Catalog to store refresh job information.

Users still need to consider how much memory is being used, how large
the concurrency is, which type of state backend is being used, and may need
to set TTL expiration.

Similar to the current practice, job parameters can be set via the Flink
conf or SET commands

When we submit a refresh command, can we help users detect if there are
any
running jobs and automatically stop them before executing the refresh
command? Then wait for it to complete before restarting the background
streaming job?

Purely from a technical implementation point of view, your proposal is
doable, but it would be more costly. Also I think data consistency itself is the responsibility of the user, similar to how Regular Table is now also the responsibility of the user, so it's consistent with its behavior and no
additional guarantees are made at the engine level.

Best,
Ron


Ahmed Hamdy <hamdy10...@gmail.com> 于2024年3月22日周五 23:50写道:

Hi Ron,
Sorry for joining the discussion late, thanks for the effort.

I think the base idea is great, however I have a couple of comments:
- I want to iterate on Timo's comments regarding the confusion between
"Dynamic Table" and current Flink "Table". Should the refactoring of the
system happen in 2.0, should we rename it in this Flip ( as the
suggestions
in the thread ) and address the holistic changes in a separate Flip for
2.0?
- I feel confused with how it is further with other components, the
examples provided feel like a standalone ETL job, could you provide in the
FLIP an example where the table is further used in subsequent queries
(specially in batch mode).
- I really like the standard of keeping the unified batch and streaming
approach
Best Regards
Ahmed Hamdy


On Fri, 22 Mar 2024 at 12:07, Lincoln Lee <lincoln.8...@gmail.com> wrote:

Hi Timo,

Thanks for your thoughtful inputs!

Yes, expanding the MATERIALIZED VIEW(MV) could achieve the same
function,
but our primary concern is that by using a view, we might limit future
opportunities
to optimize queries through automatic materialization rewriting [1],
leveraging
the support for MV by physical storage. This is because we would be
breaking
the intuitive semantics of a materialized view (a materialized view
represents
the result of a query) by allowing data modifications, thus losing the
potential
for such optimizations.

With these considerations in mind, we were inspired by Google Looker's
Persistent
Derived Table [2]. PDT is designed for building Looker's automated
modeling,
aligning with our purpose for the stream-batch automatic pipeline.
Therefore,
we are considering another candidate, Derived Table, the term 'derive'
suggests a
query, and 'table' retains modifiability. This approach would not
disrupt
our current
concept of a dynamic table, preserving the future utility of MVs.

Conceptually, a Derived Table is a Dynamic Table + Continuous Query. By
introducing
  a new concept Derived Table for this FLIP, this makes all concepts to
play
together nicely.

What do you think about this?

[1] https://calcite.apache.org/docs/materialized_views.html
[2]


https://cloud.google.com/looker/docs/derived-tables#persistent_derived_tables


Best,
Lincoln Lee


Timo Walther <twal...@apache.org> 于2024年3月22日周五 17:54写道:

Hi Ron,

thanks for the detailed answer. Sorry, for my late reply, we had a
conference that kept me busy.

  > In the current concept[1], it actually includes: Dynamic Tables &
  > & Continuous Query. Dynamic Table is just an abstract logical
concept

This explanation makes sense to me. But the docs also say "A
continuous
query is evaluated on the dynamic table yielding a new dynamic
table.".
So even our regular CREATE TABLEs are considered dynamic tables. This
can also be seen in the diagram "Dynamic Table -> Continuous Query ->
Dynamic Table". Currently, Flink queries can only be executed on
Dynamic
Tables.

  > In essence, a materialized view represents the result of a query.

Isn't that what your proposal does as well?

  > the object of the suspend operation is the refresh task of the
dynamic table

I understand that Snowflake uses the term [1] to merge their concepts
of
STREAM, TASK, and TABLE into one piece of concept. But Flink has no
concept of a "refresh task". Also, they already introduced
MATERIALIZED
VIEW. Flink is in the convenient position that the concept of
materialized views is not taken (reserved maybe for exactly this use
case?). And SQL standard concept could be "slightly adapted" to our
needs. Looking at other vendors like Postgres[2], they also use
`REFRESH` commands so why not adding additional commands such as
DELETE
or UPDATE. Oracle supports  "ON PREBUILT TABLE clause tells the
database
to use an existing table segment"[3] which comes closer to what we
want
as well.

  > it is not intended to support data modification

This is an argument that I understand. But we as Flink could allow
data
modifications. This way we are only extending the standard and don't
introduce new concepts.

If we can't agree on using MATERIALIZED VIEW concept. We should fix
our
syntax in a Flink 2.0 effort. Making regular tables bounded and
dynamic
tables unbounded. We would be closer to the SQL standard with this and
pave the way for the future. I would actually support this if all
concepts play together nicely.

  > In the future, we can consider extending the statement set syntax
to
support the creation of multiple dynamic tables.

It's good that we called the concept STATEMENT SET. This allows us to
defined CREATE TABLE within. Even if it might look a bit confusing.

Regards,
Timo

[1] https://docs.snowflake.com/en/user-guide/dynamic-tables-about
[2]

https://www.postgresql.org/docs/current/sql-creatematerializedview.html
[3] https://oracle-base.com/articles/misc/materialized-views

On 21.03.24 04:14, Feng Jin wrote:
Hi Ron and Lincoln

Thanks for driving this discussion.  I believe it will greatly
improve
the
convenience of managing user real-time pipelines.

I have some questions.

*Regarding Limitations of Dynamic Table:*

Does not support modifying the select statement after the dynamic
table
is created.

Although currently we restrict users from modifying the query, I
wonder
if
we can provide a better way to help users rebuild it without
affecting
downstream OLAP queries.


*Regarding the management of background jobs:*

1. From the documentation, the definitions SQL and job information
are
stored in the Catalog. Does this mean that if a system needs to
adapt
to
Dynamic Tables, it also needs to store Flink's job information in
the
corresponding system?
For example, does MySQL's Catalog need to store flink job
information
as
well?


2. Users still need to consider how much memory is being used, how
large
the concurrency is, which type of state backend is being used, and
may
need
to set TTL expiration.


*Regarding the Refresh Part:*

If the refresh mode is continuous and a background job is running,
caution should be taken with the refresh command as it can lead to
inconsistent data.

When we submit a refresh command, can we help users detect if there
are
any
running jobs and automatically stop them before executing the
refresh
command? Then wait for it to complete before restarting the
background
streaming job?

Best,
Feng

On Tue, Mar 19, 2024 at 9:40 PM Lincoln Lee <lincoln.8...@gmail.com

wrote:

Hi Yun,

Thank you very much for your valuable input!

Incremental mode is indeed an attractive idea, we have also
discussed
this, but in the current design,

we first provided two refresh modes: CONTINUOUS and
FULL. Incremental mode can be introduced

once the execution layer has the capability.

My answer for the two questions:

1.
Yes, cascading is a good question.  Current proposal provides a
freshness that defines a dynamic
table relative to the base table’s lag. If users need to consider
the
end-to-end freshness of multiple
cascaded dynamic tables, he can manually split them for now. Of
course, how to let multiple cascaded
   or dependent dynamic tables complete the freshness definition in
a
simpler way, I think it can be
extended in the future.

2.
Cascading refresh is also a part we focus on discussing. In this
flip,
we hope to focus as much as
possible on the core features (as it already involves a lot
things),
so we did not directly introduce related
   syntax. However, based on the current design, combined with the
catalog and lineage, theoretically,
users can also finish the cascading refresh.


Best,
Lincoln Lee


Yun Tang <myas...@live.com> 于2024年3月19日周二 13:45写道:

Hi Lincoln,

Thanks for driving this discussion, and I am so excited to see
this
topic
being discussed in the Flink community!

  From my point of view, instead of the work of unifying streaming
and
batch
in DataStream API [1], this FLIP actually could make users benefit
from
one
engine to rule batch & streaming.

If we treat this FLIP as an open-source implementation of
Snowflake's
dynamic tables [2], we still lack an incremental refresh mode to
make
the
ETL near real-time with a much cheaper computation cost. However,
I
think
this could be done under the current design by introducing another
refresh
mode in the future. Although the extra work of incremental view
maintenance
would be much larger.

For the FLIP itself, I have several questions below:

1. It seems this FLIP does not consider the lag of refreshes
across
ETL
layers from ODS ---> DWD ---> APP [3]. We currently only consider
the
scheduler interval, which means we cannot use lag to automatically
schedule
the upfront micro-batch jobs to do the work.
2. To support the automagical refreshes, we should consider the
lineage
in
the catalog or somewhere else.


[1]




https://cwiki.apache.org/confluence/display/FLINK/FLIP-134%3A+Batch+execution+for+the+DataStream+API
[2] https://docs.snowflake.com/en/user-guide/dynamic-tables-about
[3]
https://docs.snowflake.com/en/user-guide/dynamic-tables-refresh

Best
Yun Tang


________________________________
From: Lincoln Lee <lincoln.8...@gmail.com>
Sent: Thursday, March 14, 2024 14:35
To: dev@flink.apache.org <dev@flink.apache.org>
Subject: Re: [DISCUSS] FLIP-435: Introduce a New Dynamic Table for
Simplifying Data Pipelines

Hi Jing,

Thanks for your attention to this flip! I'll try to answer the
following
questions.

1. How to define query of dynamic table?
Use flink sql or introducing new syntax?
If use flink sql, how to handle the difference in SQL between
streaming
and
batch processing?
For example, a query including window aggregate based on
processing
time?
or a query including global order by?

Similar to `CREATE TABLE AS query`, here the `query` also uses
Flink
sql
and

doesn't introduce a totally new syntax.
We will not change the status respect to

the difference in functionality of flink sql itself on streaming
and
batch, for example,

the proctime window agg on streaming and global sort on batch that
you
mentioned,

in fact, do not work properly in the
other mode, so when the user modifies the

refresh mode of a dynamic table that is not supported, we will
throw
an
exception.

2. Whether modify the query of dynamic table is allowed?
Or we could only refresh a dynamic table based on the initial
query?

Yes, in the current design, the query definition of the
dynamic table is not allowed

   to be modified, and you can only refresh the data based on the
initial definition.

3. How to use dynamic table?
The dynamic table seems to be similar to the materialized view.
Will
we
do
something like materialized view rewriting during the
optimization?

It's true that dynamic table and materialized view
are similar in some ways, but as Ron

explains
there are differences. In terms of optimization, automated
materialization discovery

similar to that supported by calcite is also a potential
possibility,
perhaps with the

addition of automated rewriting in the future.



Best,
Lincoln Lee


Ron liu <ron9....@gmail.com> 于2024年3月14日周四 14:01写道:

Hi, Timo

Sorry for later response,  thanks for your feedback.
Regarding your questions:

Flink has introduced the concept of Dynamic Tables many years
ago.
How

does the term "Dynamic Table" fit into Flink's regular tables and
also

how does it relate to Table API?


I fear that adding the DYNAMIC TABLE keyword could cause
confusion
for
users, because a term for regular CREATE TABLE (that can be
"kind
of
dynamic" as well and is backed by a changelog) is then missing.
Also
given that we call our connectors for those tables,
DynamicTableSource
and DynamicTableSink.


In general, I find it contradicting that a TABLE can be
"paused" or
"resumed". From an English language perspective, this does sound
incorrect. In my opinion (without much research yet), a
continuous
updating trigger should rather be modelled as a CREATE
MATERIALIZED
VIEW
(which users are familiar with?) or a new concept such as a
CREATE
TASK
(that can be paused and resumed?).


1.
In the current concept[1], it actually includes: Dynamic Tables &
Continuous Query. Dynamic Table is just an abstract
logical concept
, which in its physical form represents either a table or a
changelog
stream. It requires the combination with Continuous Query to
achieve
dynamic updates of the target table similar to a database’s
Materialized View.
We hope to upgrade the Dynamic Table to a real entity that users
can
operate, which combines the logical concepts of Dynamic Tables +
Continuous Query. By integrating the definition of tables and
queries,
it can achieve functions similar to Materialized Views,
simplifying
users' data processing pipelines.
So, the object of the suspend operation is the refresh task of
the
dynamic table. The command  `ALTER DYNAMIC TABLE table_name
SUSPEND
`
is actually a shorthand for `ALTER DYNAMIC TABLE table_name
SUSPEND
REFRESH` (if written in full for clarity, we can also modify it).

   2. Initially, we also considered Materialized Views
, but ultimately decided against them. Materialized views are
designed
to enhance query performance for workloads that consist of
common,
repetitive query patterns. In essence, a materialized view
represents
the result of a query.
However, it is not intended to support data modification. For
Lakehouse scenarios, where the ability to delete or update data
is
crucial (such as compliance with GDPR, FLIP-2), materialized
views
fall short.

3.
Compared to CREATE (regular) TABLE, CREATE DYNAMIC TABLE not only
defines metadata in the catalog but also automatically initiates
a
data refresh task based on the query specified during table
creation.
It dynamically executes data updates. Users can focus on data
dependencies and data generation logic.

4.
The new dynamic table does not conflict with the existing
DynamicTableSource and DynamicTableSink interfaces. For the
developer,
all that needs to be implemented is the new CatalogDynamicTable,
without changing the implementation of source and sink.

5. For now, the FLIP does not consider supporting Table API
operations
on
Dynamic Table
. However, once the SQL syntax is finalized, we can discuss this
in
a
separate FLIP. Currently, I have a rough idea: the Table API
should
also introduce
DynamicTable operation interfaces
   corresponding to the existing Table interfaces.
The TableEnvironment
   will provide relevant methods to support various dynamic table
operations. The goal for the new Dynamic Table is to offer users
an
experience similar to using a database, which is why we
prioritize
SQL-based approaches initially.

How do you envision re-adding the functionality of a statement
set,
that
fans out to multiple tables? This is a very important use case
for
data
pipelines.


Multi-tables is indeed a very important user scenario. In the
future,
we can consider extending the statement set syntax to support the
creation of multiple dynamic tables.


Since the early days of Flink SQL, we were discussing `SELECT
STREAM
*
FROM T EMIT 5 MINUTES`. Your proposal seems to rephrase STREAM
and
EMIT,
into other keywords DYNAMIC TABLE and FRESHNESS. But the core
functionality is still there. I'm wondering if we should widen
the
scope
(maybe not part of this FLIP but a new FLIP) to follow the
standard
more
closely. Making `SELECT * FROM t` bounded by default and use new
syntax
for the dynamic behavior. Flink 2.0 would be the perfect time
for
this,
however, it would require careful discussions. What do you
think?


The query part indeed requires a separate FLIP
for discussion, as it involves changes to the default behavior.


[1]






https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/concepts/dynamic_tables


Best,

Ron


Jing Zhang <beyond1...@gmail.com> 于2024年3月13日周三 15:19写道:

Hi, Lincoln & Ron,

Thanks for the proposal.

I agree with the question raised by Timo.

Besides, I have some other questions.
1. How to define query of dynamic table?
Use flink sql or introducing new syntax?
If use flink sql, how to handle the difference in SQL between
streaming
and
batch processing?
For example, a query including window aggregate based on
processing
time?
or a query including global order by?

2. Whether modify the query of dynamic table is allowed?
Or we could only refresh a dynamic table based on initial query?

3. How to use dynamic table?
The dynamic table seems to be similar with materialized view.
Will
we
do
something like materialized view rewriting during the
optimization?

Best,
Jing Zhang


Timo Walther <twal...@apache.org> 于2024年3月13日周三 01:24写 道:

Hi Lincoln & Ron,

thanks for proposing this FLIP. I think a design similar to
what
you
propose has been in the heads of many people, however, I'm
wondering
how
this will fit into the bigger picture.

I haven't deeply reviewed the FLIP yet, but would like to ask
some
initial questions:

Flink has introduced the concept of Dynamic Tables many years
ago.
How
does the term "Dynamic Table" fit into Flink's regular tables
and
also
how does it relate to Table API?

I fear that adding the DYNAMIC TABLE keyword could cause
confusion
for
users, because a term for regular CREATE TABLE (that can be
"kind
of
dynamic" as well and is backed by a changelog) is then missing.
Also
given that we call our connectors for those tables,
DynamicTableSource
and DynamicTableSink.

In general, I find it contradicting that a TABLE can be
"paused"
or
"resumed". From an English language perspective, this does
sound
incorrect. In my opinion (without much research yet), a
continuous
updating trigger should rather be modelled as a CREATE
MATERIALIZED
VIEW
(which users are familiar with?) or a new concept such as a
CREATE
TASK
(that can be paused and resumed?).

How do you envision re-adding the functionality of a statement
set,
that
fans out to multiple tables? This is a very important use case
for
data
pipelines.

Since the early days of Flink SQL, we were discussing `SELECT
STREAM
*
FROM T EMIT 5 MINUTES`. Your proposal seems to rephrase STREAM
and
EMIT,
into other keywords DYNAMIC TABLE and FRESHNESS. But the core
functionality is still there. I'm wondering if we should widen
the
scope
(maybe not part of this FLIP but a new FLIP) to follow the
standard
more
closely. Making `SELECT * FROM t` bounded by default and use
new
syntax
for the dynamic behavior. Flink 2.0 would be the perfect time
for
this,
however, it would require careful discussions. What do you
think?

Regards,
Timo


On 11.03.24 08:23, Ron liu wrote:
Hi, Dev


Lincoln Lee and I would like to start a discussion about
FLIP-435:
Introduce a  New Dynamic Table for Simplifying Data Pipelines.


This FLIP is designed to simplify the development of data
processing
pipelines. With Dynamic Tables with uniform SQL statements and
freshness, users can define batch and streaming
transformations
to
data in the same way, accelerate ETL pipeline development, and
manage
task scheduling automatically.


For more details, see FLIP-435 [1]. Looking forward to your
feedback.


[1]


Best,

Lincoln & Ron
















Reply via email to