Hi Timo, Thanks for your thoughtful inputs!
Yes, expanding the MATERIALIZED VIEW(MV) could achieve the same function, but our primary concern is that by using a view, we might limit future opportunities to optimize queries through automatic materialization rewriting [1], leveraging the support for MV by physical storage. This is because we would be breaking the intuitive semantics of a materialized view (a materialized view represents the result of a query) by allowing data modifications, thus losing the potential for such optimizations. With these considerations in mind, we were inspired by Google Looker's Persistent Derived Table [2]. PDT is designed for building Looker's automated modeling, aligning with our purpose for the stream-batch automatic pipeline. Therefore, we are considering another candidate, Derived Table, the term 'derive' suggests a query, and 'table' retains modifiability. This approach would not disrupt our current concept of a dynamic table, preserving the future utility of MVs. Conceptually, a Derived Table is a Dynamic Table + Continuous Query. By introducing a new concept Derived Table for this FLIP, this makes all concepts to play together nicely. What do you think about this? [1] https://calcite.apache.org/docs/materialized_views.html [2] https://cloud.google.com/looker/docs/derived-tables#persistent_derived_tables Best, Lincoln Lee Timo Walther <twal...@apache.org> 于2024年3月22日周五 17:54写道: > Hi Ron, > > thanks for the detailed answer. Sorry, for my late reply, we had a > conference that kept me busy. > > > In the current concept[1], it actually includes: Dynamic Tables & > > & Continuous Query. Dynamic Table is just an abstract logical concept > > This explanation makes sense to me. But the docs also say "A continuous > query is evaluated on the dynamic table yielding a new dynamic table.". > So even our regular CREATE TABLEs are considered dynamic tables. This > can also be seen in the diagram "Dynamic Table -> Continuous Query -> > Dynamic Table". Currently, Flink queries can only be executed on Dynamic > Tables. > > > In essence, a materialized view represents the result of a query. > > Isn't that what your proposal does as well? > > > the object of the suspend operation is the refresh task of the > dynamic table > > I understand that Snowflake uses the term [1] to merge their concepts of > STREAM, TASK, and TABLE into one piece of concept. But Flink has no > concept of a "refresh task". Also, they already introduced MATERIALIZED > VIEW. Flink is in the convenient position that the concept of > materialized views is not taken (reserved maybe for exactly this use > case?). And SQL standard concept could be "slightly adapted" to our > needs. Looking at other vendors like Postgres[2], they also use > `REFRESH` commands so why not adding additional commands such as DELETE > or UPDATE. Oracle supports "ON PREBUILT TABLE clause tells the database > to use an existing table segment"[3] which comes closer to what we want > as well. > > > it is not intended to support data modification > > This is an argument that I understand. But we as Flink could allow data > modifications. This way we are only extending the standard and don't > introduce new concepts. > > If we can't agree on using MATERIALIZED VIEW concept. We should fix our > syntax in a Flink 2.0 effort. Making regular tables bounded and dynamic > tables unbounded. We would be closer to the SQL standard with this and > pave the way for the future. I would actually support this if all > concepts play together nicely. > > > In the future, we can consider extending the statement set syntax to > support the creation of multiple dynamic tables. > > It's good that we called the concept STATEMENT SET. This allows us to > defined CREATE TABLE within. Even if it might look a bit confusing. > > Regards, > Timo > > [1] https://docs.snowflake.com/en/user-guide/dynamic-tables-about > [2] > https://www.postgresql.org/docs/current/sql-creatematerializedview.html > [3] https://oracle-base.com/articles/misc/materialized-views > > On 21.03.24 04:14, Feng Jin wrote: > > Hi Ron and Lincoln > > > > Thanks for driving this discussion. I believe it will greatly improve > the > > convenience of managing user real-time pipelines. > > > > I have some questions. > > > > *Regarding Limitations of Dynamic Table:* > > > >> Does not support modifying the select statement after the dynamic table > > is created. > > > > Although currently we restrict users from modifying the query, I wonder > if > > we can provide a better way to help users rebuild it without affecting > > downstream OLAP queries. > > > > > > *Regarding the management of background jobs:* > > > > 1. From the documentation, the definitions SQL and job information are > > stored in the Catalog. Does this mean that if a system needs to adapt to > > Dynamic Tables, it also needs to store Flink's job information in the > > corresponding system? > > For example, does MySQL's Catalog need to store flink job information as > > well? > > > > > > 2. Users still need to consider how much memory is being used, how large > > the concurrency is, which type of state backend is being used, and may > need > > to set TTL expiration. > > > > > > *Regarding the Refresh Part:* > > > >> If the refresh mode is continuous and a background job is running, > > caution should be taken with the refresh command as it can lead to > > inconsistent data. > > > > When we submit a refresh command, can we help users detect if there are > any > > running jobs and automatically stop them before executing the refresh > > command? Then wait for it to complete before restarting the background > > streaming job? > > > > Best, > > Feng > > > > On Tue, Mar 19, 2024 at 9:40 PM Lincoln Lee <lincoln.8...@gmail.com> > wrote: > > > >> Hi Yun, > >> > >> Thank you very much for your valuable input! > >> > >> Incremental mode is indeed an attractive idea, we have also discussed > >> this, but in the current design, > >> > >> we first provided two refresh modes: CONTINUOUS and > >> FULL. Incremental mode can be introduced > >> > >> once the execution layer has the capability. > >> > >> My answer for the two questions: > >> > >> 1. > >> Yes, cascading is a good question. Current proposal provides a > >> freshness that defines a dynamic > >> table relative to the base table’s lag. If users need to consider the > >> end-to-end freshness of multiple > >> cascaded dynamic tables, he can manually split them for now. Of > >> course, how to let multiple cascaded > >> or dependent dynamic tables complete the freshness definition in a > >> simpler way, I think it can be > >> extended in the future. > >> > >> 2. > >> Cascading refresh is also a part we focus on discussing. In this flip, > >> we hope to focus as much as > >> possible on the core features (as it already involves a lot things), > >> so we did not directly introduce related > >> syntax. However, based on the current design, combined with the > >> catalog and lineage, theoretically, > >> users can also finish the cascading refresh. > >> > >> > >> Best, > >> Lincoln Lee > >> > >> > >> Yun Tang <myas...@live.com> 于2024年3月19日周二 13:45写道: > >> > >>> Hi Lincoln, > >>> > >>> Thanks for driving this discussion, and I am so excited to see this > topic > >>> being discussed in the Flink community! > >>> > >>> From my point of view, instead of the work of unifying streaming and > >> batch > >>> in DataStream API [1], this FLIP actually could make users benefit from > >> one > >>> engine to rule batch & streaming. > >>> > >>> If we treat this FLIP as an open-source implementation of Snowflake's > >>> dynamic tables [2], we still lack an incremental refresh mode to make > the > >>> ETL near real-time with a much cheaper computation cost. However, I > think > >>> this could be done under the current design by introducing another > >> refresh > >>> mode in the future. Although the extra work of incremental view > >> maintenance > >>> would be much larger. > >>> > >>> For the FLIP itself, I have several questions below: > >>> > >>> 1. It seems this FLIP does not consider the lag of refreshes across ETL > >>> layers from ODS ---> DWD ---> APP [3]. We currently only consider the > >>> scheduler interval, which means we cannot use lag to automatically > >> schedule > >>> the upfront micro-batch jobs to do the work. > >>> 2. To support the automagical refreshes, we should consider the lineage > >> in > >>> the catalog or somewhere else. > >>> > >>> > >>> [1] > >>> > >> > https://cwiki.apache.org/confluence/display/FLINK/FLIP-134%3A+Batch+execution+for+the+DataStream+API > >>> [2] https://docs.snowflake.com/en/user-guide/dynamic-tables-about > >>> [3] https://docs.snowflake.com/en/user-guide/dynamic-tables-refresh > >>> > >>> Best > >>> Yun Tang > >>> > >>> > >>> ________________________________ > >>> From: Lincoln Lee <lincoln.8...@gmail.com> > >>> Sent: Thursday, March 14, 2024 14:35 > >>> To: dev@flink.apache.org <dev@flink.apache.org> > >>> Subject: Re: [DISCUSS] FLIP-435: Introduce a New Dynamic Table for > >>> Simplifying Data Pipelines > >>> > >>> Hi Jing, > >>> > >>> Thanks for your attention to this flip! I'll try to answer the > following > >>> questions. > >>> > >>>> 1. How to define query of dynamic table? > >>>> Use flink sql or introducing new syntax? > >>>> If use flink sql, how to handle the difference in SQL between > streaming > >>> and > >>>> batch processing? > >>>> For example, a query including window aggregate based on processing > >> time? > >>>> or a query including global order by? > >>> > >>> Similar to `CREATE TABLE AS query`, here the `query` also uses Flink > sql > >>> and > >>> > >>> doesn't introduce a totally new syntax. > >>> We will not change the status respect to > >>> > >>> the difference in functionality of flink sql itself on streaming and > >>> batch, for example, > >>> > >>> the proctime window agg on streaming and global sort on batch that you > >>> mentioned, > >>> > >>> in fact, do not work properly in the > >>> other mode, so when the user modifies the > >>> > >>> refresh mode of a dynamic table that is not supported, we will throw an > >>> exception. > >>> > >>>> 2. Whether modify the query of dynamic table is allowed? > >>>> Or we could only refresh a dynamic table based on the initial query? > >>> > >>> Yes, in the current design, the query definition of the > >>> dynamic table is not allowed > >>> > >>> to be modified, and you can only refresh the data based on the > >>> initial definition. > >>> > >>>> 3. How to use dynamic table? > >>>> The dynamic table seems to be similar to the materialized view. Will > >> we > >>> do > >>>> something like materialized view rewriting during the optimization? > >>> > >>> It's true that dynamic table and materialized view > >>> are similar in some ways, but as Ron > >>> > >>> explains > >>> there are differences. In terms of optimization, automated > >>> materialization discovery > >>> > >>> similar to that supported by calcite is also a potential possibility, > >>> perhaps with the > >>> > >>> addition of automated rewriting in the future. > >>> > >>> > >>> > >>> Best, > >>> Lincoln Lee > >>> > >>> > >>> Ron liu <ron9....@gmail.com> 于2024年3月14日周四 14:01写道: > >>> > >>>> Hi, Timo > >>>> > >>>> Sorry for later response, thanks for your feedback. > >>>> Regarding your questions: > >>>> > >>>>> Flink has introduced the concept of Dynamic Tables many years ago. > >> How > >>>> > >>>> does the term "Dynamic Table" fit into Flink's regular tables and also > >>>> > >>>> how does it relate to Table API? > >>>> > >>>> > >>>>> I fear that adding the DYNAMIC TABLE keyword could cause confusion > >> for > >>>>> users, because a term for regular CREATE TABLE (that can be "kind of > >>>>> dynamic" as well and is backed by a changelog) is then missing. Also > >>>>> given that we call our connectors for those tables, > >> DynamicTableSource > >>>>> and DynamicTableSink. > >>>> > >>>> > >>>>> In general, I find it contradicting that a TABLE can be "paused" or > >>>>> "resumed". From an English language perspective, this does sound > >>>>> incorrect. In my opinion (without much research yet), a continuous > >>>>> updating trigger should rather be modelled as a CREATE MATERIALIZED > >>> VIEW > >>>>> (which users are familiar with?) or a new concept such as a CREATE > >> TASK > >>>>> (that can be paused and resumed?). > >>>> > >>>> > >>>> 1. > >>>> In the current concept[1], it actually includes: Dynamic Tables & > >>>> Continuous Query. Dynamic Table is just an abstract > >>>> logical concept > >>>> , which in its physical form represents either a table or a changelog > >>>> stream. It requires the combination with Continuous Query to achieve > >>>> dynamic updates of the target table similar to a database’s > >>>> Materialized View. > >>>> We hope to upgrade the Dynamic Table to a real entity that users can > >>>> operate, which combines the logical concepts of Dynamic Tables + > >>>> Continuous Query. By integrating the definition of tables and queries, > >>>> it can achieve functions similar to Materialized Views, simplifying > >>>> users' data processing pipelines. > >>>> So, the object of the suspend operation is the refresh task of the > >>>> dynamic table. The command `ALTER DYNAMIC TABLE table_name SUSPEND ` > >>>> is actually a shorthand for `ALTER DYNAMIC TABLE table_name SUSPEND > >>>> REFRESH` (if written in full for clarity, we can also modify it). > >>>> > >>>> 2. Initially, we also considered Materialized Views > >>>> , but ultimately decided against them. Materialized views are designed > >>>> to enhance query performance for workloads that consist of common, > >>>> repetitive query patterns. In essence, a materialized view represents > >>>> the result of a query. > >>>> However, it is not intended to support data modification. For > >>>> Lakehouse scenarios, where the ability to delete or update data is > >>>> crucial (such as compliance with GDPR, FLIP-2), materialized views > >>>> fall short. > >>>> > >>>> 3. > >>>> Compared to CREATE (regular) TABLE, CREATE DYNAMIC TABLE not only > >>>> defines metadata in the catalog but also automatically initiates a > >>>> data refresh task based on the query specified during table creation. > >>>> It dynamically executes data updates. Users can focus on data > >>>> dependencies and data generation logic. > >>>> > >>>> 4. > >>>> The new dynamic table does not conflict with the existing > >>>> DynamicTableSource and DynamicTableSink interfaces. For the developer, > >>>> all that needs to be implemented is the new CatalogDynamicTable, > >>>> without changing the implementation of source and sink. > >>>> > >>>> 5. For now, the FLIP does not consider supporting Table API operations > >> on > >>>> Dynamic Table > >>>> . However, once the SQL syntax is finalized, we can discuss this in a > >>>> separate FLIP. Currently, I have a rough idea: the Table API should > >>>> also introduce > >>>> DynamicTable operation interfaces > >>>> corresponding to the existing Table interfaces. > >>>> The TableEnvironment > >>>> will provide relevant methods to support various dynamic table > >>>> operations. The goal for the new Dynamic Table is to offer users an > >>>> experience similar to using a database, which is why we prioritize > >>>> SQL-based approaches initially. > >>>> > >>>>> How do you envision re-adding the functionality of a statement set, > >>> that > >>>>> fans out to multiple tables? This is a very important use case for > >> data > >>>>> pipelines. > >>>> > >>>> > >>>> Multi-tables is indeed a very important user scenario. In the future, > >>>> we can consider extending the statement set syntax to support the > >>>> creation of multiple dynamic tables. > >>>> > >>>> > >>>>>> Since the early days of Flink SQL, we were discussing `SELECT > >> STREAM > >>> * > >>>>> FROM T EMIT 5 MINUTES`. Your proposal seems to rephrase STREAM and > >>> EMIT, > >>>>> into other keywords DYNAMIC TABLE and FRESHNESS. But the core > >>>>> functionality is still there. I'm wondering if we should widen the > >>> scope > >>>>> (maybe not part of this FLIP but a new FLIP) to follow the standard > >>> more > >>>>> closely. Making `SELECT * FROM t` bounded by default and use new > >> syntax > >>>>> for the dynamic behavior. Flink 2.0 would be the perfect time for > >> this, > >>>>> however, it would require careful discussions. What do you think? > >>>> > >>>> > >>>> The query part indeed requires a separate FLIP > >>>> for discussion, as it involves changes to the default behavior. > >>>> > >>>> > >>>> [1] > >>>> > >>>> > >>> > >> > https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/concepts/dynamic_tables > >>>> > >>>> > >>>> Best, > >>>> > >>>> Ron > >>>> > >>>> > >>>> Jing Zhang <beyond1...@gmail.com> 于2024年3月13日周三 15:19写道: > >>>> > >>>>> Hi, Lincoln & Ron, > >>>>> > >>>>> Thanks for the proposal. > >>>>> > >>>>> I agree with the question raised by Timo. > >>>>> > >>>>> Besides, I have some other questions. > >>>>> 1. How to define query of dynamic table? > >>>>> Use flink sql or introducing new syntax? > >>>>> If use flink sql, how to handle the difference in SQL between > >> streaming > >>>> and > >>>>> batch processing? > >>>>> For example, a query including window aggregate based on processing > >>> time? > >>>>> or a query including global order by? > >>>>> > >>>>> 2. Whether modify the query of dynamic table is allowed? > >>>>> Or we could only refresh a dynamic table based on initial query? > >>>>> > >>>>> 3. How to use dynamic table? > >>>>> The dynamic table seems to be similar with materialized view. Will > >> we > >>> do > >>>>> something like materialized view rewriting during the optimization? > >>>>> > >>>>> Best, > >>>>> Jing Zhang > >>>>> > >>>>> > >>>>> Timo Walther <twal...@apache.org> 于2024年3月13日周三 01:24写道: > >>>>> > >>>>>> Hi Lincoln & Ron, > >>>>>> > >>>>>> thanks for proposing this FLIP. I think a design similar to what > >> you > >>>>>> propose has been in the heads of many people, however, I'm > >> wondering > >>>> how > >>>>>> this will fit into the bigger picture. > >>>>>> > >>>>>> I haven't deeply reviewed the FLIP yet, but would like to ask some > >>>>>> initial questions: > >>>>>> > >>>>>> Flink has introduced the concept of Dynamic Tables many years ago. > >>> How > >>>>>> does the term "Dynamic Table" fit into Flink's regular tables and > >>> also > >>>>>> how does it relate to Table API? > >>>>>> > >>>>>> I fear that adding the DYNAMIC TABLE keyword could cause confusion > >>> for > >>>>>> users, because a term for regular CREATE TABLE (that can be "kind > >> of > >>>>>> dynamic" as well and is backed by a changelog) is then missing. > >> Also > >>>>>> given that we call our connectors for those tables, > >>> DynamicTableSource > >>>>>> and DynamicTableSink. > >>>>>> > >>>>>> In general, I find it contradicting that a TABLE can be "paused" or > >>>>>> "resumed". From an English language perspective, this does sound > >>>>>> incorrect. In my opinion (without much research yet), a continuous > >>>>>> updating trigger should rather be modelled as a CREATE MATERIALIZED > >>>> VIEW > >>>>>> (which users are familiar with?) or a new concept such as a CREATE > >>> TASK > >>>>>> (that can be paused and resumed?). > >>>>>> > >>>>>> How do you envision re-adding the functionality of a statement set, > >>>> that > >>>>>> fans out to multiple tables? This is a very important use case for > >>> data > >>>>>> pipelines. > >>>>>> > >>>>>> Since the early days of Flink SQL, we were discussing `SELECT > >> STREAM > >>> * > >>>>>> FROM T EMIT 5 MINUTES`. Your proposal seems to rephrase STREAM and > >>>> EMIT, > >>>>>> into other keywords DYNAMIC TABLE and FRESHNESS. But the core > >>>>>> functionality is still there. I'm wondering if we should widen the > >>>> scope > >>>>>> (maybe not part of this FLIP but a new FLIP) to follow the standard > >>>> more > >>>>>> closely. Making `SELECT * FROM t` bounded by default and use new > >>> syntax > >>>>>> for the dynamic behavior. Flink 2.0 would be the perfect time for > >>> this, > >>>>>> however, it would require careful discussions. What do you think? > >>>>>> > >>>>>> Regards, > >>>>>> Timo > >>>>>> > >>>>>> > >>>>>> On 11.03.24 08:23, Ron liu wrote: > >>>>>>> Hi, Dev > >>>>>>> > >>>>>>> > >>>>>>> Lincoln Lee and I would like to start a discussion about > >> FLIP-435: > >>>>>>> Introduce a New Dynamic Table for Simplifying Data Pipelines. > >>>>>>> > >>>>>>> > >>>>>>> This FLIP is designed to simplify the development of data > >>> processing > >>>>>>> pipelines. With Dynamic Tables with uniform SQL statements and > >>>>>>> freshness, users can define batch and streaming transformations > >> to > >>>>>>> data in the same way, accelerate ETL pipeline development, and > >>> manage > >>>>>>> task scheduling automatically. > >>>>>>> > >>>>>>> > >>>>>>> For more details, see FLIP-435 [1]. Looking forward to your > >>> feedback. > >>>>>>> > >>>>>>> > >>>>>>> [1] > >>>>>>> > >>>>>>> > >>>>>>> Best, > >>>>>>> > >>>>>>> Lincoln & Ron > >>>>>>> > >>>>>> > >>>>>> > >>>>> > >>>> > >>> > >> > > > >