I think Flink should behave similar to other DBMSs. Other DBMS do not allow to query the history of a table, even though the DBMS has seen all changes of the table (as transactions or directly as a changelog if the table was replicated) and recorded them in its log. You need to declare a table as TEMPORAL to be able to look up previous versions.
Flink is in a very similar situation. Even though we've see the physical data of all changes and could also store it for some time, I think we should only allow queries against previous versions of a table (with FOR SYSTEM_TIME AS OF) if the table was defined as TEMPORAL. IMO this is not about having the data to return a previous version of a table (other DBMS have the data as well), it's whether the user should tell the system to allow access to the table's history or not. As I said before, we could of course declare that all tables are automatically temporal and versioned on the only event-time attribute (what if there would be more than one?), but I personally don't like such implicit conventions. I don't have a concrete proposal for a syntax to declare the version attribute of a table, but I agree that the "PERIOD FOR SYSTEM_TIME" syntax doesn't look very suitable for our purposes. I'm sure we can come up with a better syntax for this. Best, Fabian Am Sa., 9. Mai 2020 um 03:57 Uhr schrieb Kurt Young <ykt...@gmail.com>: > All tables being described by Flink's DDL are dynamic tables. But dynamic > table is more like a logical concept, but not physical things. > Physically, dynamic table has two different forms, one is a materialized > table which changes over time (e.g. Database table, HBase table), > another form is stream which represents change logs, and they are > typically stored in message queue (e.g, Kafka). For the later one, I think > the records already representing the history of the dynamic table based on > stream-table duality. > > So regarding to: > > Of course we could define that Flink implicitly tracks the (recent, > i.e., within watermark bounds) history of all dynamic tables. > I don't think this is Flink implicitly tracking the history of the dynamic > table, but the physical data of the table is already the history itself. > What Flink > did is read the history out, and organize them to be prepared for further > operations. > > I agree with another implicit convention I took though, which treats the > event time as the version of the dynamic table. Strictly speaking, > we should use another syntax "PERIOD FOR SYSTEM_TIME" [1] to indicate the > version of the table. I've been thinking about this for quite a bit, > it turns out that this semantic is too similar with Flink's event time. It > will cause more trouble for users to understand what does this mean if > we treat event time and this "PERIOD FOR SYSTEM_TIME" differently. And I'm > also afraid that we will introduce lots of bugs because not all > the developers will understand this easily. > [1] > https://docs.microsoft.com/en-us/sql/relational-databases/tables/temporal-tables?view=sql-server-ver15 > > Best, > Kurt > > > On Sat, May 9, 2020 at 5:32 AM Fabian Hueske <fhue...@gmail.com> wrote: > >> I think we need the TEMPORAL TABLE syntax because they are conceptually >> more than just regular tables. >> In a addition to being a table that always holds the latest values (and >> can thereby serve as input to a continuous query), the system also needs to >> track the history of such a table to be able to serve different versions of >> the table (as requested by FOR SYSTEM_TIME AS OF). >> >> Of course we could define that Flink implicitly tracks the (recent, i.e., >> within watermark bounds) history of all dynamic tables. >> However, there's one more thing the system needs to know to be able to >> correctly evaluate FOR SYSTEM_TIME AS OF x, namely which time attribute to >> use as version of the temporal table. >> IMO it would be good to make this explicit, especially if there is a plan >> to eventually support support multiple event-time attributes / watermarks >> on a table. >> Just using the only event time attribute would be a bit too much >> convention magic for my taste (others might of course have a different >> opinion on this subject). >> >> So I agree with Kurt that we don't necessarily need the TEMPORAL TABLE >> statement if we agree on a few implicit conventions (implicit history table >> + implicit versioning attribute). >> I'm not a big fan of such conventions and think it's better to make such >> things explicit. >> >> For temporal joins with processing time semantics, we can use regular >> dynamic tables without declaring them as TEMPORAL since we don't need a >> history table to derive the current version. >> AFAIK, these are already the semantics we use for LookupTableSource. >> >> Regarding the question of append-only tables and temporal tables, I'd >> like to share some more thoughts. >> As I said above, a temporal table consists of a regular dynamic table A >> that holds the latest version and a table H that holds the history of A. >> 1) When defining a temporal table based on a regular dynamic table (with >> a primary key), we provide A and the Flink automatically maintains H >> (bounded by watermarks) >> 2) When defining a temporal table based on an append-only table, Flink >> ingests H and we use the temporal table function to turn it into a dynamic >> table with a primary key, i.e., into A. This conversion could also be done >> during ingestion by treating the append-only stream as an upsert changelog >> and converting it into a dynamic table with PK and as Table A (just in case >> 1). >> >> As Jark said "converting append-only table into changelog table" was >> moved to future work. >> Until then, we could only define TEMPORAL TABLE on a table that is >> derived from a proper changelog stream with a specific encoding. >> The TEMPORAL VIEW would be a shortcut which would allow us to perform the >> conversion in Flink SQL (and not within the connector) and defining the >> temporal properties on the result of the view. >> >> Cheers, >> Fabian >> >> >> >> Am Fr., 8. Mai 2020 um 08:29 Uhr schrieb Kurt Young <ykt...@gmail.com>: >> >>> I might missed something but why we need a new "TEMPORAL TABLE" syntax? >>> >>> According to Fabian's first mail: >>> >>> > Hence, the requirements for a temporal table are: >>> > * The temporal table has a primary key / unique attribute >>> > * The temporal table has a time-attribute that defines the start of the >>> > validity interval of a row (processing time or event time) >>> > * The system knows that the history of the table is tracked and can >>> infer >>> > how to look up a version. >>> >>> I think primary key plus proper event time attribute is already >>> sufficient. So a join query looks like: >>> >>> "Fact join Dim FOR SYSTEM_TIME AS OF Fact.some_event_time ON Fact.id = >>> Dim.id" >>> >>> would means for every record belong to Fact, use Fact.some_event_time as >>> Dim's version (which >>> will only keep all records from Dim table with event time less or equal >>> to Fact.some_event_time, and >>> keep only one record for each primary key). >>> >>> The temporal behavior is actually triggered by the join syntax "FOR >>> SYSTEM_TIME AS OF Fact.some_event_time" >>> but not the DDL description. >>> >>> Best, >>> Kurt >>> >>> >>> On Fri, May 8, 2020 at 10:51 AM Jark Wu <imj...@gmail.com> wrote: >>> >>>> Hi, >>>> >>>> I agree what Fabian said above. >>>> Besides, IMO, (3) is in a lower priority and will involve much more >>>> things. >>>> It makes sense to me to do it in two-phase. >>>> >>>> Regarding to (3), the key point to convert an append-only table into >>>> changelog table is that the framework should know the operation type, >>>> so we introduced a special CREATE VIEW syntax to do it in the >>>> documentation >>>> [1]. Here is an example: >>>> >>>> -- my_binlog table is registered as an append-only table >>>> CREATE TABLE my_binlog ( >>>> before ROW<...>, >>>> after ROW<...>, >>>> op STRING, >>>> op_ms TIMESTAMP(3) >>>> ) WITH ( >>>> 'connector.type' = 'kafka', >>>> ... >>>> ); >>>> >>>> -- interpret my_binlog as a changelog on the op_type and id key >>>> CREATE VIEW my_table AS >>>> SELECT >>>> after.* >>>> FROM my_binlog >>>> CHANGELOG OPERATION BY op >>>> UPDATE KEY BY (id); >>>> >>>> -- my_table will materialize the insert/delete/update changes >>>> -- if we have 4 records in dbz that >>>> -- a create for 1004 >>>> -- an update for 1004 >>>> -- a create for 1005 >>>> -- a delete for 1004 >>>> > SELECT COUNT(*) FROM my_table; >>>> +-----------+ >>>> | COUNT(*) | >>>> +-----------+ >>>> | 1 | >>>> +-----------+ >>>> >>>> Best, >>>> Jark >>>> >>>> [1]: >>>> >>>> https://docs.google.com/document/d/1onyIUUdWAHfr_Yd5nZOE7SOExBc6TiW5C4LiL5FrjtQ/edit#heading=h.sz656g8mb2wb >>>> >>>> >>>> On Fri, 8 May 2020 at 00:24, Fabian Hueske <fhue...@gmail.com> wrote: >>>> >>>> > Thanks for the summary Konstantin. >>>> > I think you got all points right. >>>> > >>>> > IMO, the way forward would be to work on a FLIP to define >>>> > * the concept of temporal tables, >>>> > * how to feed them from retraction tables >>>> > * how to feed them from append-only tables >>>> > * their specification with CREATE TEMPORAL TABLE, >>>> > * how to use temporal tables in temporal table joins >>>> > * how (if at all) to use temporal tables in other types of queries >>>> > >>>> > We would keep the LATERAL TABLE syntax because it used for regular >>>> > table-valued functions. >>>> > However, we would probably remove the TemporalTableFunction (which is >>>> a >>>> > built-in table-valued function) after we deprecated it for a while. >>>> > >>>> > Cheers, Fabian >>>> > >>>> > Am Do., 7. Mai 2020 um 18:03 Uhr schrieb Konstantin Knauf < >>>> > kna...@apache.org>: >>>> > >>>> >> Hi everyone, >>>> >> >>>> >> Thanks everyone for joining the discussion on this. Please let me >>>> >> summarize >>>> >> what I have understood so far. >>>> >> >>>> >> 1) For joining an append-only table and a temporal table the syntax >>>> the >>>> >> "FOR >>>> >> SYSTEM_TIME AS OF <time-attribute>" seems to be preferred (Fabian, >>>> Timo, >>>> >> Seth). >>>> >> >>>> >> 2) To define a temporal table based on a changelog stream from an >>>> external >>>> >> system CREATE TEMPORAL TABLE (as suggested by Timo/Fabian) could be >>>> used. >>>> >> 3) In order to also support temporal tables derived from an >>>> append-only >>>> >> stream, we either need to support TEMPORAL VIEW (as mentioned by >>>> Fabian) >>>> >> or >>>> >> need to have a way to convert an append-only table into a changelog >>>> table >>>> >> (briefly discussed in [1]). It is not completely clear to me how a >>>> >> temporal >>>> >> table based on an append-only table would be with the syntax >>>> proposed in >>>> >> [1] and 2). @Jark Wu <imj...@gmail.com> could you elaborate a bit on >>>> >> that? >>>> >> >>>> >> How do we move forward with this? >>>> >> >>>> >> * It seems that a two-phased approach (1 + 2 now, 3 later) makes >>>> sense. >>>> >> What do you think? * If we proceed like this, what would this mean >>>> for the >>>> >> current syntax of LATERAL TABLE? Would we keep it? Would we >>>> eventually >>>> >> deprecate and drop it? Since only after 3) we would be on par with >>>> the >>>> >> current temporal table function join, I assume, we could only drop it >>>> >> thereafter. >>>> >> >>>> >> Thanks, Konstantin >>>> >> >>>> >> [1] >>>> >> >>>> >> >>>> https://docs.google.com/document/d/1onyIUUdWAHfr_Yd5nZOE7SOExBc6TiW5C4LiL5FrjtQ/edit#heading=h.kduaw9moein6 >>>> >> >>>> >> >>>> >> On Sat, Apr 18, 2020 at 3:07 PM Jark Wu <imj...@gmail.com> wrote: >>>> >> >>>> >> > Hi Fabian, >>>> >> > >>>> >> > Just to clarify a little bit, we decided to move the "converting >>>> >> > append-only table into changelog table" into future work. >>>> >> > So FLIP-105 only introduced some CDC formats (debezium) and new >>>> >> TableSource >>>> >> > interfaces proposed in FLIP-95. >>>> >> > I should have started a new FLIP for the new CDC formats and keep >>>> >> FLIP-105 >>>> >> > as it is to avoid the confusion, sorry about that. >>>> >> > >>>> >> > Best, >>>> >> > Jark >>>> >> > >>>> >> > >>>> >> > On Sat, 18 Apr 2020 at 00:35, Fabian Hueske <fhue...@gmail.com> >>>> wrote: >>>> >> > >>>> >> > > Thanks Jark! >>>> >> > > >>>> >> > > I certainly need to read up on FLIP-105 (and I'll try to adjust >>>> my >>>> >> > > terminology to changelog table from now on ;-) ) >>>> >> > > If FLIP-105 addresses the issue of converting an append-only >>>> table >>>> >> into a >>>> >> > > changelog table that upserts on primary key (basically what the >>>> VIEW >>>> >> > > definition in my first email did), >>>> >> > > TEMPORAL VIEWs become much less important. >>>> >> > > In that case, we would be well served with TEMPORAL TABLE and >>>> TEMPORAL >>>> >> > VIEW >>>> >> > > would be a nice-to-have feature for some later time. >>>> >> > > >>>> >> > > Cheers, Fabian >>>> >> > > >>>> >> > > >>>> >> > > >>>> >> > > >>>> >> > > >>>> >> > > >>>> >> > > Am Fr., 17. Apr. 2020 um 18:13 Uhr schrieb Jark Wu < >>>> imj...@gmail.com >>>> >> >: >>>> >> > > >>>> >> > > > Hi Fabian, >>>> >> > > > >>>> >> > > > I think converting an append-only table into temporal table >>>> contains >>>> >> > two >>>> >> > > > things: >>>> >> > > > (1) converting append-only table into changelog table (or >>>> retraction >>>> >> > > table >>>> >> > > > as you said) >>>> >> > > > (2) define the converted changelog table (maybe is a view now) >>>> as >>>> >> > > temporal >>>> >> > > > (or history tracked). >>>> >> > > > >>>> >> > > > The first thing is also mentioned and discussed in FLIP-105 >>>> design >>>> >> > draft >>>> >> > > > [1] which proposed a syntax >>>> >> > > > to convert the append-only table into a changelog table. >>>> >> > > > >>>> >> > > > I think TEMPORAL TABLE is quite straightforward and simple, >>>> and can >>>> >> > > satisfy >>>> >> > > > most existing changelog >>>> >> > > > data with popular CDC formats. TEMPORAL VIEW is flexible but >>>> will >>>> >> > involve >>>> >> > > > more SQL codes. I think >>>> >> > > > we can support them both. >>>> >> > > > >>>> >> > > > Best, >>>> >> > > > Jark >>>> >> > > > >>>> >> > > > [1]: >>>> >> > > > >>>> >> > > > >>>> >> > > >>>> >> > >>>> >> >>>> https://docs.google.com/document/d/1onyIUUdWAHfr_Yd5nZOE7SOExBc6TiW5C4LiL5FrjtQ/edit#heading=h.sz656g8mb2wb >>>> >> > > > >>>> >> > > > On Fri, 17 Apr 2020 at 23:52, Fabian Hueske <fhue...@gmail.com >>>> > >>>> >> wrote: >>>> >> > > > >>>> >> > > > > Hi, >>>> >> > > > > >>>> >> > > > > I agree with most of what Timo said. >>>> >> > > > > >>>> >> > > > > The TEMPORAL keyword (which unfortunately might be easily >>>> confused >>>> >> > with >>>> >> > > > > TEMPORARY...) looks very intuitive and I think using the >>>> only time >>>> >> > > > > attribute for versioning would be a good choice. >>>> >> > > > > >>>> >> > > > > However, TEMPORAL TABLE on retraction tables do not solve >>>> the full >>>> >> > > > problem. >>>> >> > > > > I believe there will be also cases where we need to derive a >>>> >> temporal >>>> >> > > > table >>>> >> > > > > from an append only table (what TemporalTableFunctions do >>>> right >>>> >> now). >>>> >> > > > > I think the best choice for this would be TEMPORAL VIEW but >>>> as I >>>> >> > > > explained, >>>> >> > > > > it might be a longer way until this can be supported. >>>> >> > > > > TEMPORAL VIEW would also address the problem of >>>> preprocessing. >>>> >> > > > > >>>> >> > > > > > Regarding retraction table with a primary key and a >>>> >> time-attribute: >>>> >> > > > > > These semantics are still unclear to me. Can retractions >>>> only >>>> >> occur >>>> >> > > > > > within watermarks? Or are they also used for representing >>>> late >>>> >> > > updates? >>>> >> > > > > >>>> >> > > > > Time attributes and retraction streams are a challenging >>>> topic >>>> >> that I >>>> >> > > > > haven't completely understood yet. >>>> >> > > > > So far we treated time attributes always as part of the data. >>>> >> > > > > In combination with retractions, it seems that they become >>>> >> metadata >>>> >> > > that >>>> >> > > > > specifies when a change was done. >>>> >> > > > > I think this is different from treating time attributes as >>>> regular >>>> >> > > data. >>>> >> > > > > >>>> >> > > > > Cheers, Fabian >>>> >> > > > > >>>> >> > > > > >>>> >> > > > > Am Fr., 17. Apr. 2020 um 17:23 Uhr schrieb Seth Wiesman < >>>> >> > > > > sjwies...@gmail.com >>>> >> > > > > >: >>>> >> > > > > >>>> >> > > > > > I really like the TEMPORAL keyword, I find it very >>>> intuitive. >>>> >> > > > > > >>>> >> > > > > > The down side of this approach would be that an additional >>>> >> > > > preprocessing >>>> >> > > > > > > step would not be possible anymore because there is no >>>> >> preceding >>>> >> > > > view. >>>> >> > > > > > > >>>> >> > > > > > >>>> >> > > > > > Yes and no. My understanding is we are not talking about >>>> making >>>> >> > any >>>> >> > > > > > changes to how temporal tables are defined in the table >>>> api. >>>> >> Since >>>> >> > > you >>>> >> > > > > > cannot currently define temporal table functions in pure >>>> SQL >>>> >> > > > > applications, >>>> >> > > > > > but only pre-register them in YAML, you can't do any >>>> >> pre-processing >>>> >> > > as >>>> >> > > > it >>>> >> > > > > > stands today. Preprocessing may be a generally useful >>>> feature, >>>> >> I'm >>>> >> > > not >>>> >> > > > > > sure, but this syntax does not lose us anything in pure SQL >>>> >> > > > applications. >>>> >> > > > > > >>>> >> > > > > > These semantics are still unclear to me. Can retractions >>>> only >>>> >> occur >>>> >> > > > > > > within watermarks? Or are they also used for >>>> representing late >>>> >> > > > updates? >>>> >> > > > > > > >>>> >> > > > > > >>>> >> > > > > > I do not know the SQL standard well enough to give a >>>> principled >>>> >> > > > response >>>> >> > > > > to >>>> >> > > > > > this question. However, in my observation of production >>>> >> workloads, >>>> >> > > > users >>>> >> > > > > of >>>> >> > > > > > temporal table functions are doing so to denormalize star >>>> >> schemas >>>> >> > > > before >>>> >> > > > > > performing further transformations and aggregations and >>>> expect >>>> >> the >>>> >> > > > output >>>> >> > > > > > to be an append stream. With the ongoing work to better >>>> support >>>> >> > > > > changelogs, >>>> >> > > > > > the need for users to understand the differences in append >>>> vs >>>> >> > upsert >>>> >> > > in >>>> >> > > > > > their query may be diminishing but everyone else on this >>>> thread >>>> >> can >>>> >> > > > > better >>>> >> > > > > > speak to that. >>>> >> > > > > > >>>> >> > > > > > Seth >>>> >> > > > > > >>>> >> > > > > > On Fri, Apr 17, 2020 at 10:03 AM Timo Walther < >>>> >> twal...@apache.org> >>>> >> > > > > wrote: >>>> >> > > > > > >>>> >> > > > > > > Hi Fabian, >>>> >> > > > > > > >>>> >> > > > > > > thank you very much for this great summary! >>>> >> > > > > > > >>>> >> > > > > > > I wasn't aware of the Polymorphic Table Functions >>>> standard. >>>> >> This >>>> >> > > is a >>>> >> > > > > > > very interesting topic that we should definitely >>>> consider in >>>> >> the >>>> >> > > > > future. >>>> >> > > > > > > Maybe this could also help us in defining tables more >>>> >> dynamically >>>> >> > > > > within >>>> >> > > > > > > a query. It could help solving problems as discussed in >>>> >> FLIP-113. >>>> >> > > > > > > >>>> >> > > > > > > Regarding joining: >>>> >> > > > > > > >>>> >> > > > > > > IMO we should aim for "FOR SYSTEM_TIME AS OF x" instead >>>> of the >>>> >> > > > current >>>> >> > > > > > > `LATERAL TABLE(rates(x))` syntax. A function that also >>>> behaves >>>> >> > > like a >>>> >> > > > > > > table and needs this special `LATERAL` keyword during >>>> joining >>>> >> is >>>> >> > > not >>>> >> > > > > > > very intuitive. The PTF could be used once they are fully >>>> >> > supported >>>> >> > > > by >>>> >> > > > > > > Calcite and we have the big picture how to also use them >>>> for >>>> >> > other >>>> >> > > > > > > time-based operations (windows?, joins?). >>>> >> > > > > > > >>>> >> > > > > > > Regarding how represent a temporal table: >>>> >> > > > > > > >>>> >> > > > > > > I think that our current DDL, current LookupTableSource >>>> and >>>> >> > > temporal >>>> >> > > > > > > tables can fit nicely together. >>>> >> > > > > > > >>>> >> > > > > > > How about we simply introduce an additional keyword >>>> >> `TEMPORAL` to >>>> >> > > > > > > indicate history tracking semantics? I think this is the >>>> >> minimal >>>> >> > > > > > > invasive solution: >>>> >> > > > > > > >>>> >> > > > > > > CREATE TEMPORAL TABLE rates ( >>>> >> > > > > > > currency CHAR(3) NOT NULL PRIMARY KEY, >>>> >> > > > > > > rate DOUBLE, >>>> >> > > > > > > rowtime TIMESTAMP, >>>> >> > > > > > > WATERMARK FOR rowtime AS rowtime - INTERVAL '5' >>>> MINUTE) >>>> >> > > > > > > WITH (...); >>>> >> > > > > > > >>>> >> > > > > > > - The primary key would be defined by the DDL. >>>> >> > > > > > > - The available time attribute would be defined by the >>>> DDL. >>>> >> > Either >>>> >> > > as >>>> >> > > > > > > the only time attribute of the table or we introduce a >>>> special >>>> >> > > > > > > constraint similar to `PRIMARY KEY`. >>>> >> > > > > > > >>>> >> > > > > > > The down side of this approach would be that an >>>> additional >>>> >> > > > > preprocessing >>>> >> > > > > > > step would not be possible anymore because there is no >>>> >> preceding >>>> >> > > > view. >>>> >> > > > > > > >>>> >> > > > > > > The `TEMPORAL` semantic can be stored in the properties >>>> of the >>>> >> > > table >>>> >> > > > > > > when writing to a catalog. We do the same for watermarks >>>> and >>>> >> > > computed >>>> >> > > > > > > columns. >>>> >> > > > > > > >>>> >> > > > > > > Without a `TEMPORAL` keyword, a `FOR SYSTEM_TIME AS OF x` >>>> >> would >>>> >> > > only >>>> >> > > > > > > work on processing time by a lookup into the external >>>> system >>>> >> or >>>> >> > on >>>> >> > > > > > > event-time by using the time semantics that the external >>>> >> system >>>> >> > > > > supports. >>>> >> > > > > > > >>>> >> > > > > > > Regarding retraction table with a primary key and a >>>> >> > time-attribute: >>>> >> > > > > > > >>>> >> > > > > > > These semantics are still unclear to me. Can retractions >>>> only >>>> >> > occur >>>> >> > > > > > > within watermarks? Or are they also used for >>>> representing late >>>> >> > > > updates? >>>> >> > > > > > > >>>> >> > > > > > > Regards, >>>> >> > > > > > > Timo >>>> >> > > > > > > >>>> >> > > > > > > >>>> >> > > > > > > On 17.04.20 14:34, Fabian Hueske wrote: >>>> >> > > > > > > > Hi all, >>>> >> > > > > > > > >>>> >> > > > > > > > First of all, I appologize for the text wall that's >>>> >> > following... >>>> >> > > > ;-) >>>> >> > > > > > > > >>>> >> > > > > > > > A temporal table join joins an append-only table and a >>>> >> temporal >>>> >> > > > > table. >>>> >> > > > > > > > The question about how to represent a temporal table >>>> join >>>> >> boils >>>> >> > > > down >>>> >> > > > > to >>>> >> > > > > > > two >>>> >> > > > > > > > questions: >>>> >> > > > > > > > >>>> >> > > > > > > > 1) How to represent a temporal table >>>> >> > > > > > > > 2) How to specify the join of an append-only table and >>>> a >>>> >> > temporal >>>> >> > > > > table >>>> >> > > > > > > > >>>> >> > > > > > > > I'll discuss these points separately. >>>> >> > > > > > > > >>>> >> > > > > > > > # 1 How to represent a temporal table >>>> >> > > > > > > > >>>> >> > > > > > > > A temporal table is a table that can be looked up with >>>> a >>>> >> time >>>> >> > > > > parameter >>>> >> > > > > > > and >>>> >> > > > > > > > which returns the rows of the table at that point in >>>> time / >>>> >> for >>>> >> > > > that >>>> >> > > > > > > > version. >>>> >> > > > > > > > In order to be able to (conceptually) look up previous >>>> >> > versions, >>>> >> > > a >>>> >> > > > > > > temporal >>>> >> > > > > > > > table must be (conceptually) backed by a history table >>>> that >>>> >> > > tracks >>>> >> > > > > all >>>> >> > > > > > > > previous versions (see SqlServer docs [1]). >>>> >> > > > > > > > In the context of our join, we added another >>>> restriction >>>> >> namely >>>> >> > > > that >>>> >> > > > > > the >>>> >> > > > > > > > table must have a primary key, i.e., there is only one >>>> row >>>> >> for >>>> >> > > each >>>> >> > > > > > > version >>>> >> > > > > > > > for each unique key. >>>> >> > > > > > > > >>>> >> > > > > > > > Hence, the requirements for a temporal table are: >>>> >> > > > > > > > * The temporal table has a primary key / unique >>>> attribute >>>> >> > > > > > > > * The temporal table has a time-attribute that defines >>>> the >>>> >> > start >>>> >> > > of >>>> >> > > > > the >>>> >> > > > > > > > validity interval of a row (processing time or event >>>> time) >>>> >> > > > > > > > * The system knows that the history of the table is >>>> tracked >>>> >> and >>>> >> > > can >>>> >> > > > > > infer >>>> >> > > > > > > > how to look up a version. >>>> >> > > > > > > > >>>> >> > > > > > > > There are two possible types of input from which we >>>> want to >>>> >> > > create >>>> >> > > > > > > temporal >>>> >> > > > > > > > tables (that I'm aware of): >>>> >> > > > > > > > >>>> >> > > > > > > > * append-only tables, i.e., tables that contain the >>>> full >>>> >> change >>>> >> > > > > history >>>> >> > > > > > > > * retraction tables, i.e., tables that are updating >>>> and do >>>> >> not >>>> >> > > > > remember >>>> >> > > > > > > the >>>> >> > > > > > > > history. >>>> >> > > > > > > > >>>> >> > > > > > > > There are a few ways to do this: >>>> >> > > > > > > > >>>> >> > > > > > > > ## 1.1 Defining a VIEW on an append-only table with a >>>> time >>>> >> > > > attribute. >>>> >> > > > > > > > >>>> >> > > > > > > > The following view definition results in a view that >>>> >> provides >>>> >> > the >>>> >> > > > > > latest >>>> >> > > > > > > > rate for each currency. >>>> >> > > > > > > > >>>> >> > > > > > > > CREATE VIEW rates AS >>>> >> > > > > > > > SELECT >>>> >> > > > > > > > currency, MAX(rate) as rate, MAX(rowtime) as rowtime >>>> >> > > > > > > > FROM rates_history rh1 >>>> >> > > > > > > > WHERE >>>> >> > > > > > > > rh1.rowtime = ( >>>> >> > > > > > > > SELECT max(rowtime) >>>> >> > > > > > > > FROM rates_history rh2 >>>> >> > > > > > > > WHERE rh2.curreny = rh1.currency) >>>> >> > > > > > > > GROUP BY currency >>>> >> > > > > > > > WITH ( >>>> >> > > > > > > > 'historytracking' = 'true', >>>> >> > > > > > > > 'historytracking.starttime' = 'rowtime'); >>>> >> > > > > > > > >>>> >> > > > > > > > However, we also need to tell the system to track the >>>> >> history >>>> >> > of >>>> >> > > > all >>>> >> > > > > > > > changes of the view in order to be able to look it up. >>>> >> > > > > > > > That's what the properties in the WITH clause are for >>>> >> (inspired >>>> >> > > by >>>> >> > > > > > > > SqlServer's TEMPORAL TABLE DDL syntax). >>>> >> > > > > > > > Note that this is *not* a syntax proposal but only >>>> meant to >>>> >> > show >>>> >> > > > > which >>>> >> > > > > > > > information is needed. >>>> >> > > > > > > > This view allows to look up any version of the "rates" >>>> view. >>>> >> > > > > > > > >>>> >> > > > > > > > In addition to designing and implementing the DDL >>>> syntax for >>>> >> > > views >>>> >> > > > > that >>>> >> > > > > > > > support temporal lookups, the optimizer would need to >>>> >> > understand >>>> >> > > > the >>>> >> > > > > > > > semantics of the view definition in depth. >>>> >> > > > > > > > Among other things it needs to understand that the >>>> MAX() >>>> >> > > > aggregation >>>> >> > > > > on >>>> >> > > > > > > the >>>> >> > > > > > > > time-attribute preserves its watermark alignment. >>>> >> > > > > > > > AFAIK, this is not the case at the moment (the time >>>> >> attribute >>>> >> > > would >>>> >> > > > > be >>>> >> > > > > > > > converted into a regular TIMESTAMP and lose it's time >>>> >> attribute >>>> >> > > > > > > properties) >>>> >> > > > > > > > >>>> >> > > > > > > > ## 1.2 A retraction table with a primary key and a >>>> >> > > time-attribute. >>>> >> > > > > > > > >>>> >> > > > > > > > On paper it looks like such a table would automatically >>>> >> qualify >>>> >> > > as >>>> >> > > > a >>>> >> > > > > > > > time-versioned table because it completely fulfills the >>>> >> > > > requirements. >>>> >> > > > > > > > However, I don't think we can use it *as is* as a >>>> temporal >>>> >> > table >>>> >> > > if >>>> >> > > > > we >>>> >> > > > > > > want >>>> >> > > > > > > > to have clean semantics. >>>> >> > > > > > > > The problem here is the "lost history" of the >>>> retraction >>>> >> table. >>>> >> > > The >>>> >> > > > > > > dynamic >>>> >> > > > > > > > table that is defined on the retraction stream only >>>> stores >>>> >> the >>>> >> > > > latest >>>> >> > > > > > > > version (even though it sees all versions). >>>> >> > > > > > > > Conceptually, a temporal table look up the version of >>>> the >>>> >> table >>>> >> > > at >>>> >> > > > > any >>>> >> > > > > > > > point in time because it is backed by a history table. >>>> >> > > > > > > > If this information is not available, we cannot have a >>>> >> > > semantically >>>> >> > > > > > clean >>>> >> > > > > > > > definition of the join IMO. >>>> >> > > > > > > > >>>> >> > > > > > > > Therefore we should define the table in a way that the >>>> >> system >>>> >> > > knows >>>> >> > > > > > that >>>> >> > > > > > > > the history is tracked. >>>> >> > > > > > > > In MSSQL uses a syntax similar to this one >>>> >> > > > > > > > >>>> >> > > > > > > > CREATE TABLE rates ( >>>> >> > > > > > > > currency CHAR(3) NOT NULL PRIMARY KEY, >>>> >> > > > > > > > rate DOUBLE, >>>> >> > > > > > > > rowtime TIMESTAMP, >>>> >> > > > > > > > WATERMARK FOR rowtime AS rowtime - INTERVAL '5' >>>> MINUTE) >>>> >> > > > > > > > WITH ( >>>> >> > > > > > > > 'historytracking' = 'true', >>>> >> > > > > > > > 'historytracking.starttime' = 'rowtime'); >>>> >> > > > > > > > >>>> >> > > > > > > > The 'historytracking' properties would decare that the >>>> table >>>> >> > > tracks >>>> >> > > > > its >>>> >> > > > > > > > history and also specify the attribute (rowtime) that >>>> is >>>> >> used >>>> >> > for >>>> >> > > > > > > > versioning. >>>> >> > > > > > > > >>>> >> > > > > > > > ## 1.3 Registering a TableFunction that takes an >>>> append-only >>>> >> > > table >>>> >> > > > > with >>>> >> > > > > > > > time attribute >>>> >> > > > > > > > >>>> >> > > > > > > > The TableFunction requires a few parameters: >>>> >> > > > > > > > * the source table from which to derive the temporal >>>> table >>>> >> > > > > > > > * the key attribute on which the versions of the source >>>> >> table >>>> >> > > > should >>>> >> > > > > be >>>> >> > > > > > > > computed >>>> >> > > > > > > > * the time attribute that defines the versions >>>> >> > > > > > > > * a lookup timestamp for the version of that is >>>> returned. >>>> >> > > > > > > > >>>> >> > > > > > > > The reason why we chose the TableFunction approach >>>> over the >>>> >> > VIEW >>>> >> > > > > > approach >>>> >> > > > > > > > so far were: >>>> >> > > > > > > > * It is easier for the optimizer to identify a build-in >>>> >> table >>>> >> > > > > function >>>> >> > > > > > > than >>>> >> > > > > > > > to analyze and reason about a generic VIEW. >>>> >> > > > > > > > * We would need to make the optimizer a lot smarter to >>>> infer >>>> >> > all >>>> >> > > > the >>>> >> > > > > > > > properties from the generic VIEW definition that we >>>> need >>>> >> for a >>>> >> > > > > temporal >>>> >> > > > > > > > table join. >>>> >> > > > > > > > * Passing a parameter to a function is a known thing, >>>> >> passing a >>>> >> > > > > > parameter >>>> >> > > > > > > > to a VIEW not so much. >>>> >> > > > > > > > * Users would need to specify the VIEW exactly >>>> correct, such >>>> >> > that >>>> >> > > > it >>>> >> > > > > > can >>>> >> > > > > > > be >>>> >> > > > > > > > used as a temporal table. Look at 1.1 why this is not >>>> >> trivial. >>>> >> > > > > > > > >>>> >> > > > > > > > There is two ways to use a TableFunction: >>>> >> > > > > > > > >>>> >> > > > > > > > ### 1.3.1 Built-in and pre-registered function that is >>>> >> > > > parameterized >>>> >> > > > > in >>>> >> > > > > > > the >>>> >> > > > > > > > SQL query >>>> >> > > > > > > > >>>> >> > > > > > > > Here, we do not need to do anything to register the >>>> >> function. >>>> >> > We >>>> >> > > > > simply >>>> >> > > > > > > use >>>> >> > > > > > > > it in the query (see example in 2.2 below) >>>> >> > > > > > > > >>>> >> > > > > > > > ### 1.3.2 Parameterize function when it is registered >>>> in the >>>> >> > > > catalog >>>> >> > > > > > > (with >>>> >> > > > > > > > a provided Java implementation) >>>> >> > > > > > > > >>>> >> > > > > > > > This is the approach, we've used so far. In the Table >>>> API, >>>> >> the >>>> >> > > > > function >>>> >> > > > > > > is >>>> >> > > > > > > > first parameterized and created and then registered: >>>> >> > > > > > > > We would need a DDL syntax to parameterize UDFs on >>>> >> > registration. >>>> >> > > > > > > > I don't want to propose a syntax here, but just to get >>>> an >>>> >> idea >>>> >> > it >>>> >> > > > > might >>>> >> > > > > > > > look like this: >>>> >> > > > > > > > >>>> >> > > > > > > > CREATE FUNCTION rates AS >>>> >> > > > > > > > 'org.apache.flink.table.udfs.TemporalTableFunction' >>>> WITH >>>> >> > > ('table' = >>>> >> > > > > > > > 'rates_history', 'key' = 'cur', 'time' = 'rowtime') >>>> >> > > > > > > > >>>> >> > > > > > > > Right now, the Flink Catalog interface does not have >>>> the >>>> >> > > > > functionality >>>> >> > > > > > to >>>> >> > > > > > > > store such parameters and would need some hacks to >>>> properly >>>> >> > > create >>>> >> > > > > > > properly >>>> >> > > > > > > > parameterize function instances. >>>> >> > > > > > > > >>>> >> > > > > > > > >>>> >> > > > > > > > >>>> >> > > > > > > > # 2 Defining a join of an append-only table and a >>>> temporal >>>> >> > table >>>> >> > > > > > > > >>>> >> > > > > > > > The append-only table needs to have a time-attribute >>>> >> > (processing >>>> >> > > > time >>>> >> > > > > > or >>>> >> > > > > > > > event time, but same as the temporal table). >>>> >> > > > > > > > The join then needs to specify two things: >>>> >> > > > > > > > * an equality predicate that includes the primary key >>>> of the >>>> >> > > > temporal >>>> >> > > > > > > table >>>> >> > > > > > > > * declare the time attribute of the append-only table >>>> as the >>>> >> > time >>>> >> > > > as >>>> >> > > > > of >>>> >> > > > > > > > which to look up the temporal table, i.e, get the >>>> version of >>>> >> > the >>>> >> > > > > > temporal >>>> >> > > > > > > > table that is valid for the timestamp of the current >>>> row >>>> >> from >>>> >> > the >>>> >> > > > > > > > append-only table >>>> >> > > > > > > > >>>> >> > > > > > > > The tricky part (from a syntax point of view) is to >>>> specify >>>> >> the >>>> >> > > > > lookup >>>> >> > > > > > > > time. >>>> >> > > > > > > > >>>> >> > > > > > > > ## 2.1 the temporal table is a regular table or view >>>> (see >>>> >> > > > approaches >>>> >> > > > > > 1.1 >>>> >> > > > > > > > and 1.2 above) >>>> >> > > > > > > > >>>> >> > > > > > > > In this case we can use the "FOR SYSTEM_TIME AS OF x" >>>> >> clause as >>>> >> > > > > > follows: >>>> >> > > > > > > > >>>> >> > > > > > > > SELECT * >>>> >> > > > > > > > FROM orders o, rates r FOR SYSTEM_TIME AS OF >>>> o.ordertime >>>> >> > > > > > > > WHERE o.currency = r.currency >>>> >> > > > > > > > >>>> >> > > > > > > > IMO, this is a great syntax and the one we should >>>> strive >>>> >> for. >>>> >> > > > > > > > We would need to bend the rules of the SQL standard >>>> which >>>> >> only >>>> >> > > > > allows x >>>> >> > > > > > > in >>>> >> > > > > > > > "FOR SYSTEM_TIME AS OF x" to be a constant and the >>>> table on >>>> >> > which >>>> >> > > > it >>>> >> > > > > is >>>> >> > > > > > > > applied usually needs to be a specific type (not sure >>>> if >>>> >> views >>>> >> > > are >>>> >> > > > > > > > supported), but I guess this is fine. >>>> >> > > > > > > > NOTE: the "FOR SYSTEM_TIME AS OF x" is already >>>> supported for >>>> >> > > > > > LookupTable >>>> >> > > > > > > > Joins if x is a processing time attribute [2]. >>>> >> > > > > > > > >>>> >> > > > > > > > ## 2.2 the temporal table is a TableFunction and >>>> >> parameterized >>>> >> > in >>>> >> > > > the >>>> >> > > > > > > query >>>> >> > > > > > > > (see 1.3.1 above) >>>> >> > > > > > > > >>>> >> > > > > > > > SELECT * >>>> >> > > > > > > > FROM orders o, >>>> >> > > > > > > > TEMPORAL_TABLE( >>>> >> > > > > > > > table => TABLE(rates_history), >>>> >> > > > > > > > key => DESCRIPTOR(currency), >>>> >> > > > > > > > time => DESCRIPTOR(rowtime)) r >>>> >> > > > > > > > ON o.currency = r.currency >>>> >> > > > > > > > >>>> >> > > > > > > > The function "TEMPORAL_TABLE" is built-in and nothing >>>> was >>>> >> > > > registered >>>> >> > > > > in >>>> >> > > > > > > the >>>> >> > > > > > > > catalog (except the rates_history table). >>>> >> > > > > > > > In fact this is valid SQL:2016 syntax and called >>>> Polymorphic >>>> >> > > Table >>>> >> > > > > > > > Functions. Have a look here [3]. >>>> >> > > > > > > > >>>> >> > > > > > > > ## 2.3 the temporal table is a TableFunction that was >>>> >> > > parameterized >>>> >> > > > > > > during >>>> >> > > > > > > > registration (see 1.3.2 above) >>>> >> > > > > > > > >>>> >> > > > > > > > This is what we have at the momement. >>>> >> > > > > > > > >>>> >> > > > > > > > SELECT * >>>> >> > > > > > > > FROM orders o, >>>> >> > > > > > > > LATERAL TABLE (rates(o.ordertime)) >>>> >> > > > > > > > ON o.currency = r.currency >>>> >> > > > > > > > >>>> >> > > > > > > > The TableFunction "rates" was registered in the >>>> catalog and >>>> >> > > > > > parameterized >>>> >> > > > > > > > to the "rates_history" append-only table, the key was >>>> set to >>>> >> > > > > > "currency", >>>> >> > > > > > > > and the time attribute was declared. >>>> >> > > > > > > > >>>> >> > > > > > > > # SUMMARY >>>> >> > > > > > > > >>>> >> > > > > > > > IMO we should in the long run aim to define temporal >>>> tables >>>> >> > > either >>>> >> > > > as >>>> >> > > > > > > > upsert retraction tables and views on append-only >>>> tables and >>>> >> > join >>>> >> > > > > them >>>> >> > > > > > > > using the "FOR SYSTEM_TIME AS OF x" syntax. >>>> >> > > > > > > > I guess it is debatable whether we need to decare to >>>> track >>>> >> > > history >>>> >> > > > > for >>>> >> > > > > > > > these tables (which we don't actually do) or if we do >>>> it by >>>> >> > > > > convention >>>> >> > > > > > if >>>> >> > > > > > > > the table has a time attribute. >>>> >> > > > > > > > It should be (relatively) easy to get this to work for >>>> >> > retraction >>>> >> > > > > > tables >>>> >> > > > > > > > which will be supported soon. >>>> >> > > > > > > > It will be more work for views because we need to >>>> improve >>>> >> the >>>> >> > > time >>>> >> > > > > > > > attribute handling with MAX() aggregations. >>>> >> > > > > > > > The "FOR SYSTEM_TIME AS OF x" is already supported for >>>> >> > > > > > LookupTableSources >>>> >> > > > > > > > and would "only" need to be adapted to work on temporal >>>> >> tables. >>>> >> > > > > > > > >>>> >> > > > > > > > Registering parameterized TableFunctions in the catalog >>>> >> seems >>>> >> > > like >>>> >> > > > > > quite >>>> >> > > > > > > a >>>> >> > > > > > > > bit of work. We need new DDL syntax, extend the >>>> catalog and >>>> >> > > > function >>>> >> > > > > > > > instantiation. This won't be easy, IMO. >>>> >> > > > > > > > If we only support them as TEMPORARY FUNCTION which >>>> are not >>>> >> > > > > registered >>>> >> > > > > > in >>>> >> > > > > > > > the catalog it will be easier. The question is whether >>>> it is >>>> >> > > worth >>>> >> > > > > the >>>> >> > > > > > > > effort if we decide for the other approach. >>>> >> > > > > > > > >>>> >> > > > > > > > Using TableFunctions that are parameterized in the >>>> query >>>> >> will >>>> >> > > > require >>>> >> > > > > > to >>>> >> > > > > > > > extend the Calcite parser and framework to support >>>> >> Polymorphic >>>> >> > > > Table >>>> >> > > > > > > > Functions. >>>> >> > > > > > > > However, there might already some work be done there, >>>> >> because >>>> >> > > AFAIK >>>> >> > > > > > > Apache >>>> >> > > > > > > > Beam aims to support this syntax for windowing >>>> functions as >>>> >> > > > described >>>> >> > > > > > in >>>> >> > > > > > > > the "One SQL to rule them all" paper [4]. >>>> >> > > > > > > > It might be the fastest and fully SQL standard >>>> compliant >>>> >> way. >>>> >> > > > > > > > >>>> >> > > > > > > > Cheers, >>>> >> > > > > > > > Fabian >>>> >> > > > > > > > >>>> >> > > > > > > > [1] >>>> >> > > > > > > > >>>> >> > > > > > > >>>> >> > > > > > >>>> >> > > > > >>>> >> > > > >>>> >> > > >>>> >> > >>>> >> >>>> https://docs.microsoft.com/en-us/sql/relational-databases/tables/temporal-tables >>>> >> > > > > > > > [2] >>>> >> > > > > > > > >>>> >> > > > > > > >>>> >> > > > > > >>>> >> > > > > >>>> >> > > > >>>> >> > > >>>> >> > >>>> >> >>>> https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/streaming/joins.html#usage-1 >>>> >> > > > > > > > [3] >>>> >> > > > > > > > >>>> >> > > > > > > >>>> >> > > > > > >>>> >> > > > > >>>> >> > > > >>>> >> > > >>>> >> > >>>> >> >>>> https://standards.iso.org/ittf/PubliclyAvailableStandards/c069776_ISO_IEC_TR_19075-7_2017.zip >>>> >> > > > > > > > [4] https://arxiv.org/abs/1905.12133 >>>> >> > > > > > > > >>>> >> > > > > > > > Am Fr., 17. Apr. 2020 um 06:37 Uhr schrieb Jark Wu < >>>> >> > > > imj...@gmail.com >>>> >> > > > > >: >>>> >> > > > > > > > >>>> >> > > > > > > >> Hi Konstantin, >>>> >> > > > > > > >> >>>> >> > > > > > > >> Thanks for bringing this discussion. I think temporal >>>> join >>>> >> is >>>> >> > a >>>> >> > > > very >>>> >> > > > > > > >> important feature and should be exposed to pure SQL >>>> users. >>>> >> > > > > > > >> And I already received many requirements like this. >>>> >> > > > > > > >> However, my concern is that how to properly support >>>> this >>>> >> > feature >>>> >> > > > in >>>> >> > > > > > SQL. >>>> >> > > > > > > >> Introducing a DDL syntax for Temporal Table Function >>>> is one >>>> >> > way, >>>> >> > > > but >>>> >> > > > > > > maybe >>>> >> > > > > > > >> not the best one. >>>> >> > > > > > > >> >>>> >> > > > > > > >> The most important reason is that the underlying of >>>> >> temporal >>>> >> > > table >>>> >> > > > > > > function >>>> >> > > > > > > >> is exactly a changelog stream. >>>> >> > > > > > > >> The temporal join is actually temporal joining a fact >>>> >> stream >>>> >> > > with >>>> >> > > > > the >>>> >> > > > > > > >> changelog stream on processing time or event time. >>>> >> > > > > > > >> We will soon support to create a changelog source >>>> using DDL >>>> >> > once >>>> >> > > > > > FLIP-95 >>>> >> > > > > > > >> and FLIP-105 is finished. >>>> >> > > > > > > >> At that time, we can have a simple DDL to create >>>> changelog >>>> >> > > source >>>> >> > > > > like >>>> >> > > > > > > >> this; >>>> >> > > > > > > >> >>>> >> > > > > > > >> CREATE TABLE rate_changelog ( >>>> >> > > > > > > >> currency STRING, >>>> >> > > > > > > >> rate DECIMAL >>>> >> > > > > > > >> ) WITH ( >>>> >> > > > > > > >> 'connector' = 'kafka', >>>> >> > > > > > > >> 'topic' = 'rate_binlog', >>>> >> > > > > > > >> 'properties.bootstrap.servers' = 'localhost:9092', >>>> >> > > > > > > >> 'format' = 'debezium-json' >>>> >> > > > > > > >> ); >>>> >> > > > > > > >> >>>> >> > > > > > > >> In the meanwhile, we already have a SQL standard >>>> temporal >>>> >> join >>>> >> > > > > syntax >>>> >> > > > > > > [1], >>>> >> > > > > > > >> i.e. the "A JOIN B FOR SYSTEM_TIME AS OF ..". >>>> >> > > > > > > >> It is currently used as dimension table lookup join, >>>> but >>>> >> the >>>> >> > > > > semantic >>>> >> > > > > > is >>>> >> > > > > > > >> the same to the "temporal table function join"[2]. >>>> >> > > > > > > >> I'm in favor of "FOR SYSTEM_TIME AS OF" because it is >>>> more >>>> >> > > nature >>>> >> > > > > > > >> becuase the definition of B is a *table* not a *table >>>> >> > function*, >>>> >> > > > > > > >> and the syntax is included in SQL standard. >>>> >> > > > > > > >> >>>> >> > > > > > > >> So once we have the ability to define "rate_changelog" >>>> >> table, >>>> >> > > then >>>> >> > > > > we >>>> >> > > > > > > can >>>> >> > > > > > > >> use the following query to temporal join the >>>> changelog on >>>> >> > > > processing >>>> >> > > > > > > time. >>>> >> > > > > > > >> >>>> >> > > > > > > >> SELECT * >>>> >> > > > > > > >> FROM orders JOIN rate_changelog FOR SYSTEM_TIME AS OF >>>> >> > > > > orders.proctime >>>> >> > > > > > > >> ON orders.currency = rate_changelog.currency; >>>> >> > > > > > > >> >>>> >> > > > > > > >> In a nutshell, once FLIP-95 and FLIP-105 is ready, we >>>> can >>>> >> > easily >>>> >> > > > to >>>> >> > > > > > > support >>>> >> > > > > > > >> "temporal join on changelogs" without introducing new >>>> >> syntax. >>>> >> > > > > > > >> IMO, introducing a DDL syntax for Temporal Table >>>> Function >>>> >> > looks >>>> >> > > > like >>>> >> > > > > > > not an >>>> >> > > > > > > >> easy way and may have repetitive work. >>>> >> > > > > > > >> >>>> >> > > > > > > >> Best, >>>> >> > > > > > > >> Jark >>>> >> > > > > > > >> >>>> >> > > > > > > >> [1]: >>>> >> > > > > > > >> >>>> >> > > > > > > >> >>>> >> > > > > > > >>>> >> > > > > > >>>> >> > > > > >>>> >> > > > >>>> >> > > >>>> >> > >>>> >> >>>> https://ci.apache.org/projects/flink/flink-docs-master/dev/table/streaming/joins.html#join-with-a-temporal-table >>>> >> > > > > > > >> [2]: >>>> >> > > > > > > >> >>>> >> > > > > > > >> >>>> >> > > > > > > >>>> >> > > > > > >>>> >> > > > > >>>> >> > > > >>>> >> > > >>>> >> > >>>> >> >>>> https://ci.apache.org/projects/flink/flink-docs-master/dev/table/streaming/joins.html#join-with-a-temporal-table-function >>>> >> > > > > > > >> >>>> >> > > > > > > >> >>>> >> > > > > > > >> >>>> >> > > > > > > >> >>>> >> > > > > > > >> >>>> >> > > > > > > >> On Thu, 16 Apr 2020 at 23:04, Benchao Li < >>>> >> libenc...@gmail.com >>>> >> > > >>>> >> > > > > wrote: >>>> >> > > > > > > >> >>>> >> > > > > > > >>> Hi Konstantin, >>>> >> > > > > > > >>> >>>> >> > > > > > > >>> Thanks for bringing up this discussion. +1 for the >>>> idea. >>>> >> > > > > > > >>> We have met this in our company too, and I planned to >>>> >> support >>>> >> > > it >>>> >> > > > > > > recently >>>> >> > > > > > > >>> in our internal branch. >>>> >> > > > > > > >>> >>>> >> > > > > > > >>> regarding to your questions, >>>> >> > > > > > > >>> 1) I think it might be more a table/view than >>>> function, >>>> >> just >>>> >> > > like >>>> >> > > > > > > >> Temporal >>>> >> > > > > > > >>> Table (which is also known as >>>> >> > > > > > > >>> dimension table). Maybe we need a DDL like CREATE >>>> VIEW and >>>> >> > plus >>>> >> > > > > some >>>> >> > > > > > > >>> additional settings. >>>> >> > > > > > > >>> 2) If we design the DDL for it like view, then maybe >>>> >> > temporary >>>> >> > > is >>>> >> > > > > ok >>>> >> > > > > > > >>> enough. >>>> >> > > > > > > >>> >>>> >> > > > > > > >>> Konstantin Knauf <kna...@apache.org> 于2020年4月16日周四 >>>> >> 下午8:16写道: >>>> >> > > > > > > >>> >>>> >> > > > > > > >>>> Hi everyone, >>>> >> > > > > > > >>>> >>>> >> > > > > > > >>>> it would be very useful if temporal tables could be >>>> >> created >>>> >> > > via >>>> >> > > > > > DDL. >>>> >> > > > > > > >>>> Currently, users either need to do this in the >>>> Table API >>>> >> or >>>> >> > in >>>> >> > > > the >>>> >> > > > > > > >>>> environment file of the Flink CLI, which both >>>> require the >>>> >> > user >>>> >> > > > to >>>> >> > > > > > > >> switch >>>> >> > > > > > > >>>> the context of the SQL CLI/Editor. I recently >>>> created a >>>> >> > ticket >>>> >> > > > for >>>> >> > > > > > > this >>>> >> > > > > > > >>>> request [1]. >>>> >> > > > > > > >>>> >>>> >> > > > > > > >>>> I see two main questions: >>>> >> > > > > > > >>>> >>>> >> > > > > > > >>>> 1) What would be the DDL syntax? A Temporal Table >>>> is on >>>> >> the >>>> >> > > one >>>> >> > > > > > hand a >>>> >> > > > > > > >>> view >>>> >> > > > > > > >>>> and on the other a function depending on how you >>>> look at >>>> >> it. >>>> >> > > > > > > >>>> >>>> >> > > > > > > >>>> 2) Would this temporal table view/function be >>>> stored in >>>> >> the >>>> >> > > > > catalog >>>> >> > > > > > or >>>> >> > > > > > > >>> only >>>> >> > > > > > > >>>> be temporary? >>>> >> > > > > > > >>>> >>>> >> > > > > > > >>>> I personally do not have much experience in this >>>> area of >>>> >> > > Flink, >>>> >> > > > > so I >>>> >> > > > > > > am >>>> >> > > > > > > >>>> looking forward to hearing your thoughts on this. >>>> >> > > > > > > >>>> >>>> >> > > > > > > >>>> Best, >>>> >> > > > > > > >>>> >>>> >> > > > > > > >>>> Konstantin >>>> >> > > > > > > >>>> >>>> >> > > > > > > >>>> [1] >>>> https://issues.apache.org/jira/browse/FLINK-16824 >>>> >> > > > > > > >>>> >>>> >> > > > > > > >>>> -- >>>> >> > > > > > > >>>> >>>> >> > > > > > > >>>> Konstantin Knauf >>>> >> > > > > > > >>>> >>>> >> > > > > > > >>> >>>> >> > > > > > > >>> >>>> >> > > > > > > >>> -- >>>> >> > > > > > > >>> >>>> >> > > > > > > >>> Benchao Li >>>> >> > > > > > > >>> School of Electronics Engineering and Computer >>>> Science, >>>> >> > Peking >>>> >> > > > > > > University >>>> >> > > > > > > >>> Tel:+86-15650713730 >>>> >> > > > > > > >>> Email: libenc...@gmail.com; libenc...@pku.edu.cn >>>> >> > > > > > > >>> >>>> >> > > > > > > >> >>>> >> > > > > > > > >>>> >> > > > > > > >>>> >> > > > > > > >>>> >> > > > > > >>>> >> > > > > >>>> >> > > > >>>> >> > > >>>> >> > >>>> >> >>>> >> >>>> >> -- >>>> >> >>>> >> Konstantin Knauf >>>> >> >>>> >> https://twitter.com/snntrable >>>> >> >>>> >> https://github.com/knaufk >>>> >> >>>> > >>>> >>>